Մեծ տվյալների մշակում Python PySpark-ով


Այս ձեռնարկում մենք կուսումնասիրենք Python-ի և PySpark-ի հզոր համակցությունը տվյալների մեծ հավաքածուներ մշակելու համար: PySpark-ը Python գրադարան է, որն ապահովում է ինտերֆեյս Apache Spark-ի համար՝ արագ և ընդհանուր նշանակության կլաստերային հաշվողական համակարգ: Օգտագործելով PySpark-ը, մենք կարող ենք արդյունավետորեն բաշխել և մշակել տվյալները մեքենաների կլաստերում, ինչը մեզ հնարավորություն է տալիս հեշտությամբ կառավարել լայնածավալ տվյալների հավաքածուները:

Այս հոդվածում մենք կխոսենք PySpark-ի հիմունքների մեջ և ցույց կտանք, թե ինչպես կատարել տվյալների մշակման տարբեր առաջադրանքներ մեծ տվյալների հավաքածուներում: Մենք կանդրադառնանք հիմնական հասկացություններին, ինչպիսիք են RDD-ները (Resilient Distributed Datasets) և DataFrames-ը և կներկայացնենք դրանց գործնական կիրառությունները քայլ առ քայլ օրինակների միջոցով: Այս ձեռնարկի ավարտին դուք լավ կհասկանաք, թե ինչպես օգտագործել PySpark-ը՝ արդյունավետորեն մշակելու և վերլուծելու զանգվածային տվյալների հավաքածուները:

Բաժին 1. Սկսել PySpark-ին

Այս բաժնում մենք կստեղծենք մեր զարգացման միջավայրը և կծանոթանանք PySpark-ի հիմնական հասկացություններին: Մենք կքննարկենք, թե ինչպես տեղադրել PySpark-ը, սկզբնավորել SparkSession-ը և բեռնել տվյալները RDD-ներում և DataFrames-ներում: Եկեք սկսենք տեղադրել PySpark-ը.

# Install PySpark
!pip install pyspark

Արդյունք

Collecting pyspark
...
Successfully installed pyspark-3.1.2

PySpark-ը տեղադրելուց հետո մենք կարող ենք սկզբնավորել SparkSession-ը՝ մեր Spark կլաստերին միանալու համար.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()

Երբ մեր SparkSession-ը պատրաստ է, մենք այժմ կարող ենք բեռնել տվյալները RDD-ներում կամ DataFrames-ներում: RDD-ները PySpark-ի հիմնական տվյալների կառուցվածքն են և ապահովում են տարրերի բաշխված հավաքածու: Մյուս կողմից, DataFrames-ը տվյալները կազմակերպում է անվանակոչված սյունակների մեջ, որոնք նման են հարաբերական տվյալների բազայի աղյուսակին: Եկեք բեռնենք CSV ֆայլը որպես DataFrame:

# Load a CSV file as a DataFrame
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)

Արդյունք

+---+------+--------+
|id |name  |age     |
+---+------+--------+
|1  |John  |32      |
|2  |Alice |28      |
|3  |Bob   |35      |
+---+------+--------+

Ինչպես տեսնում եք վերը նշված կոդի հատվածից, մենք օգտագործում ենք «read.csv()» մեթոդը՝ CSV ֆայլը տվյալների շրջանակում կարդալու համար: «header=True» արգումենտը ցույց է տալիս, որ առաջին տողը պարունակում է սյունակների անուններ, իսկ «inferSchema=True»-ն ավտոմատ կերպով եզրակացնում է յուրաքանչյուր սյունակի տվյալների տեսակները:

Բաժին 2. Տվյալների փոխակերպում և վերլուծություն

Այս բաժնում մենք կուսումնասիրենք տվյալների փոխակերպման և վերլուծության տարբեր մեթոդներ՝ օգտագործելով PySpark-ը: Մենք կանդրադառնանք այնպիսի գործառնություններին, ինչպիսիք են զտումը, համախմբումը և տվյալների հավաքածուների միացումը: Սկսենք տվյալների զտումից՝ հիմնվելով հատուկ պայմանների վրա.

# Filter data
filtered_data = df.filter(df["age"] > 30)

Արդյունք

+---+----+---+
|id |name|age|
+---+----+---+
|1  |John|32 |
|3  |Bob |35 |
+---+----+---+

Վերոնշյալ կոդի հատվածում մենք օգտագործում ենք «ֆիլտր()» մեթոդը՝ տողեր ընտրելու համար, որտեղ «տարիքը» սյունակը 30-ից մեծ է: Այս գործողությունը թույլ է տալիս մեզ հանել տվյալների համապատասխան ենթաբազմությունները մեր մեծ տվյալներից:

Հաջորդը, եկեք կատարենք ագրեգացիա մեր տվյալների բազայի վրա՝ օգտագործելով «groupBy()» և «agg()» մեթոդները.

# Aggregate data
aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})

Արդյունք

+------+-----------+--------+
|gender|avg(salary)|max(age)|
+------+-----------+--------+
|Male  |2500       |32      |
|Female|3000       |35      |
+------+-----------+--------+

Այստեղ մենք խմբավորում ենք տվյալները ըստ «գենդերային» սյունակի և հաշվարկում միջին աշխատավարձը և առավելագույն տարիքը յուրաքանչյուր խմբի համար: Ստացված «համախմբված_տվյալներ» DataFrame-ը մեզ տալիս է արժեքավոր պատկերացումներ մեր տվյալների բազայի վերաբերյալ:

Բացի զտումից և համախմբումից, PySpark-ը մեզ հնարավորություն է տալիս արդյունավետ կերպով միանալ բազմաթիվ տվյալների հավաքածուներին: Դիտարկենք մի օրինակ, որտեղ մենք ունենք երկու DataFrames՝ «df1» և «df2»: Մենք կարող ենք նրանց միանալ ընդհանուր սյունակի հիման վրա.

# Join two DataFrames
joined_data = df1.join(df2, on="id", how="inner")

Արդյունք

+---+----+---------+------+
|id |name|department|salary|
+---+----+---------+------+
|1  |John|HR       |2500  |
|2  |Alice|IT      |3000  |
|3  |Bob |Sales    |2000  |
+---+----+---------+------+

«join()» մեթոդը թույլ է տալիս մեզ միավորել DataFrames-ը` հիմնված ընդհանուր սյունակի վրա, որը նշված է «on» պարամետրով: Մենք կարող ենք ընտրել միացման տարբեր տեսակներ, ինչպիսիք են «ներքին», «արտաքին», «ձախ» կամ «աջ»՝ կախված մեր պահանջներից:

Բաժին 3. PySpark-ի առաջադեմ տեխնիկա

Այս բաժնում մենք կուսումնասիրենք PySpark-ի առաջադեմ տեխնիկան՝ հետագայում մեր տվյալների մշակման հնարավորությունները բարելավելու համար: Մենք կանդրադառնանք այնպիսի թեմաների, ինչպիսիք են՝ օգտագործողի կողմից սահմանված գործառույթները (UDF), պատուհանի գործառույթները և քեշավորումը: Եկեք սկսենք սահմանելով և օգտագործելով UDF.

from pyspark.sql.functions import udf

# Define a UDF
def square(x):
    return x ** 2

# Register the UDF
square_udf = udf(square)

# Apply the UDF to a column
df = df.withColumn("age_squared", square_udf(df["age"]))

Արդյունք

+---+------+---+------------+
|id |name  |age|age_squared |
+---+------+---+------------+
|1  |John  |32 |1024        |
|2  |Alice |28 |784         |
|3  |Bob   |35 |1225        |
+---+------+---+------------+

Վերոնշյալ կոդի հատվածում մենք սահմանում ենք պարզ UDF, որը կոչվում է «square()», որը քառակուսի է տալիս տվյալ մուտքագրումը: Այնուհետև մենք գրանցում ենք UDF-ն՝ օգտագործելով «udf()» ֆունկցիան և այն կիրառում «տարիք» սյունակում՝ ստեղծելով նոր սյունակ, որը կոչվում է «age_squared» մեր DataFrame-ում:

PySpark-ը նաև տրամադրում է պատուհանների հզոր գործառույթներ, որոնք թույլ են տալիս մեզ հաշվարկներ կատարել պատուհանների որոշակի տիրույթներում: Հաշվարկենք յուրաքանչյուր աշխատողի միջին աշխատավարձը՝ հաշվի առնելով նախորդ և հաջորդ տողերը.

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, avg

# Define the window
window = Window.orderBy("id")

# Calculate average salary with lag and lead
df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)

Արդյունք

+---+----+---------+------+----------+
|id |name|department|salary|avg_salary|
+---+----+---------+------+----------+
|1  |John|HR       |2500  |2666.6667 |
|2  |Alice|

IT      |3000  |2833.3333 |
|3  |Bob |Sales    |2000  |2500      |
+---+----+---------+------+----------+

Վերոնշյալ կոդի քաղվածքում մենք սահմանում ենք պատուհան՝ օգտագործելով «Window.orderBy()» մեթոդը՝ նշելով «id» սյունակի հիման վրա տողերի դասավորությունը: Այնուհետև մենք օգտագործում ենք «lag()» և «lead()» ֆունկցիաները՝ համապատասխանաբար նախորդ և հաջորդ տողերը մուտք գործելու համար: Ի վերջո, մենք հաշվարկում ենք միջին աշխատավարձը, հաշվի առնելով ընթացիկ շարքը և դրա հարևանները:

Ի վերջո, քեշավորումը PySpark-ում էական տեխնիկա է, որը բարելավում է կրկնվող ալգորիթմների կամ կրկնվող հաշվարկների կատարումը: Մենք կարող ենք քեշավորել DataFrame կամ RDD հիշողության մեջ՝ օգտագործելով «cache()» մեթոդը՝

# Cache a DataFrame
df.cache()

Քեշավորման համար ոչ մի ելք չի ցուցադրվում, սակայն քեշավորված DataFrame-ի վրա հիմնված հետագա գործողություններն ավելի արագ կլինեն, քանի որ տվյալները պահվում են հիշողության մեջ:

Եզրակացություն

Այս ձեռնարկում մենք ուսումնասիրեցինք PySpark-ի հզորությունը Python-ում մեծ տվյալների հավաքածուներ մշակելու համար: Մենք սկսեցինք ստեղծելով մեր զարգացման միջավայրը և բեռնելով տվյալները RDD-ներում և DataFrames-ներում: Այնուհետև մենք խորացանք տվյալների փոխակերպման և վերլուծության տեխնիկայի մեջ, ներառյալ տվյալների հավաքածուների զտումը, համախմբումը և միացումը: Վերջապես, մենք քննարկեցինք PySpark-ի առաջադեմ տեխնիկան, ինչպիսիք են օգտագործողի կողմից սահմանված գործառույթները, պատուհանի գործառույթները և քեշավորումը: