Մեծ տվյալների մշակում Python PySpark-ով
Այս ձեռնարկում մենք կուսումնասիրենք Python-ի և PySpark-ի հզոր համակցությունը տվյալների մեծ հավաքածուներ մշակելու համար: PySpark-ը Python գրադարան է, որն ապահովում է ինտերֆեյս Apache Spark-ի համար՝ արագ և ընդհանուր նշանակության կլաստերային հաշվողական համակարգ: Օգտագործելով PySpark-ը, մենք կարող ենք արդյունավետորեն բաշխել և մշակել տվյալները մեքենաների կլաստերում, ինչը մեզ հնարավորություն է տալիս հեշտությամբ կառավարել լայնածավալ տվյալների հավաքածուները:
Այս հոդվածում մենք կխոսենք PySpark-ի հիմունքների մեջ և ցույց կտանք, թե ինչպես կատարել տվյալների մշակման տարբեր առաջադրանքներ մեծ տվյալների հավաքածուներում: Մենք կանդրադառնանք հիմնական հասկացություններին, ինչպիսիք են RDD-ները (Resilient Distributed Datasets) և DataFrames-ը և կներկայացնենք դրանց գործնական կիրառությունները քայլ առ քայլ օրինակների միջոցով: Այս ձեռնարկի ավարտին դուք լավ կհասկանաք, թե ինչպես օգտագործել PySpark-ը՝ արդյունավետորեն մշակելու և վերլուծելու զանգվածային տվյալների հավաքածուները:
Բաժին 1. Սկսել PySpark-ին
Այս բաժնում մենք կստեղծենք մեր զարգացման միջավայրը և կծանոթանանք PySpark-ի հիմնական հասկացություններին: Մենք կքննարկենք, թե ինչպես տեղադրել PySpark-ը, սկզբնավորել SparkSession-ը և բեռնել տվյալները RDD-ներում և DataFrames-ներում: Եկեք սկսենք տեղադրել PySpark-ը.
# Install PySpark
!pip install pyspark
Արդյունք
Collecting pyspark
...
Successfully installed pyspark-3.1.2
PySpark-ը տեղադրելուց հետո մենք կարող ենք սկզբնավորել SparkSession-ը՝ մեր Spark կլաստերին միանալու համար.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
Երբ մեր SparkSession-ը պատրաստ է, մենք այժմ կարող ենք բեռնել տվյալները RDD-ներում կամ DataFrames-ներում: RDD-ները PySpark-ի հիմնական տվյալների կառուցվածքն են և ապահովում են տարրերի բաշխված հավաքածու: Մյուս կողմից, DataFrames-ը տվյալները կազմակերպում է անվանակոչված սյունակների մեջ, որոնք նման են հարաբերական տվյալների բազայի աղյուսակին: Եկեք բեռնենք CSV ֆայլը որպես DataFrame:
# Load a CSV file as a DataFrame
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
Արդյունք
+---+------+--------+
|id |name |age |
+---+------+--------+
|1 |John |32 |
|2 |Alice |28 |
|3 |Bob |35 |
+---+------+--------+
Ինչպես տեսնում եք վերը նշված կոդի հատվածից, մենք օգտագործում ենք «read.csv()» մեթոդը՝ CSV ֆայլը տվյալների շրջանակում կարդալու համար: «header=True» արգումենտը ցույց է տալիս, որ առաջին տողը պարունակում է սյունակների անուններ, իսկ «inferSchema=True»-ն ավտոմատ կերպով եզրակացնում է յուրաքանչյուր սյունակի տվյալների տեսակները:
Բաժին 2. Տվյալների փոխակերպում և վերլուծություն
Այս բաժնում մենք կուսումնասիրենք տվյալների փոխակերպման և վերլուծության տարբեր մեթոդներ՝ օգտագործելով PySpark-ը: Մենք կանդրադառնանք այնպիսի գործառնություններին, ինչպիսիք են զտումը, համախմբումը և տվյալների հավաքածուների միացումը: Սկսենք տվյալների զտումից՝ հիմնվելով հատուկ պայմանների վրա.
# Filter data
filtered_data = df.filter(df["age"] > 30)
Արդյունք
+---+----+---+
|id |name|age|
+---+----+---+
|1 |John|32 |
|3 |Bob |35 |
+---+----+---+
Վերոնշյալ կոդի հատվածում մենք օգտագործում ենք «ֆիլտր()» մեթոդը՝ տողեր ընտրելու համար, որտեղ «տարիքը» սյունակը 30-ից մեծ է: Այս գործողությունը թույլ է տալիս մեզ հանել տվյալների համապատասխան ենթաբազմությունները մեր մեծ տվյալներից:
Հաջորդը, եկեք կատարենք ագրեգացիա մեր տվյալների բազայի վրա՝ օգտագործելով «groupBy()» և «agg()» մեթոդները.
# Aggregate data
aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
Արդյունք
+------+-----------+--------+
|gender|avg(salary)|max(age)|
+------+-----------+--------+
|Male |2500 |32 |
|Female|3000 |35 |
+------+-----------+--------+
Այստեղ մենք խմբավորում ենք տվյալները ըստ «գենդերային» սյունակի և հաշվարկում միջին աշխատավարձը և առավելագույն տարիքը յուրաքանչյուր խմբի համար: Ստացված «համախմբված_տվյալներ» DataFrame-ը մեզ տալիս է արժեքավոր պատկերացումներ մեր տվյալների բազայի վերաբերյալ:
Բացի զտումից և համախմբումից, PySpark-ը մեզ հնարավորություն է տալիս արդյունավետ կերպով միանալ բազմաթիվ տվյալների հավաքածուներին: Դիտարկենք մի օրինակ, որտեղ մենք ունենք երկու DataFrames՝ «df1» և «df2»: Մենք կարող ենք նրանց միանալ ընդհանուր սյունակի հիման վրա.
# Join two DataFrames
joined_data = df1.join(df2, on="id", how="inner")
Արդյունք
+---+----+---------+------+
|id |name|department|salary|
+---+----+---------+------+
|1 |John|HR |2500 |
|2 |Alice|IT |3000 |
|3 |Bob |Sales |2000 |
+---+----+---------+------+
«join()» մեթոդը թույլ է տալիս մեզ միավորել DataFrames-ը` հիմնված ընդհանուր սյունակի վրա, որը նշված է «on» պարամետրով: Մենք կարող ենք ընտրել միացման տարբեր տեսակներ, ինչպիսիք են «ներքին», «արտաքին», «ձախ» կամ «աջ»՝ կախված մեր պահանջներից:
Բաժին 3. PySpark-ի առաջադեմ տեխնիկա
Այս բաժնում մենք կուսումնասիրենք PySpark-ի առաջադեմ տեխնիկան՝ հետագայում մեր տվյալների մշակման հնարավորությունները բարելավելու համար: Մենք կանդրադառնանք այնպիսի թեմաների, ինչպիսիք են՝ օգտագործողի կողմից սահմանված գործառույթները (UDF), պատուհանի գործառույթները և քեշավորումը: Եկեք սկսենք սահմանելով և օգտագործելով UDF.
from pyspark.sql.functions import udf
# Define a UDF
def square(x):
return x ** 2
# Register the UDF
square_udf = udf(square)
# Apply the UDF to a column
df = df.withColumn("age_squared", square_udf(df["age"]))
Արդյունք
+---+------+---+------------+
|id |name |age|age_squared |
+---+------+---+------------+
|1 |John |32 |1024 |
|2 |Alice |28 |784 |
|3 |Bob |35 |1225 |
+---+------+---+------------+
Վերոնշյալ կոդի հատվածում մենք սահմանում ենք պարզ UDF, որը կոչվում է «square()», որը քառակուսի է տալիս տվյալ մուտքագրումը: Այնուհետև մենք գրանցում ենք UDF-ն՝ օգտագործելով «udf()» ֆունկցիան և այն կիրառում «տարիք» սյունակում՝ ստեղծելով նոր սյունակ, որը կոչվում է «age_squared» մեր DataFrame-ում:
PySpark-ը նաև տրամադրում է պատուհանների հզոր գործառույթներ, որոնք թույլ են տալիս մեզ հաշվարկներ կատարել պատուհանների որոշակի տիրույթներում: Հաշվարկենք յուրաքանչյուր աշխատողի միջին աշխատավարձը՝ հաշվի առնելով նախորդ և հաջորդ տողերը.
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, avg
# Define the window
window = Window.orderBy("id")
# Calculate average salary with lag and lead
df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
Արդյունք
+---+----+---------+------+----------+
|id |name|department|salary|avg_salary|
+---+----+---------+------+----------+
|1 |John|HR |2500 |2666.6667 |
|2 |Alice|
IT |3000 |2833.3333 |
|3 |Bob |Sales |2000 |2500 |
+---+----+---------+------+----------+
Վերոնշյալ կոդի քաղվածքում մենք սահմանում ենք պատուհան՝ օգտագործելով «Window.orderBy()» մեթոդը՝ նշելով «id» սյունակի հիման վրա տողերի դասավորությունը: Այնուհետև մենք օգտագործում ենք «lag()» և «lead()» ֆունկցիաները՝ համապատասխանաբար նախորդ և հաջորդ տողերը մուտք գործելու համար: Ի վերջո, մենք հաշվարկում ենք միջին աշխատավարձը, հաշվի առնելով ընթացիկ շարքը և դրա հարևանները:
Ի վերջո, քեշավորումը PySpark-ում էական տեխնիկա է, որը բարելավում է կրկնվող ալգորիթմների կամ կրկնվող հաշվարկների կատարումը: Մենք կարող ենք քեշավորել DataFrame կամ RDD հիշողության մեջ՝ օգտագործելով «cache()» մեթոդը՝
# Cache a DataFrame
df.cache()
Քեշավորման համար ոչ մի ելք չի ցուցադրվում, սակայն քեշավորված DataFrame-ի վրա հիմնված հետագա գործողություններն ավելի արագ կլինեն, քանի որ տվյալները պահվում են հիշողության մեջ:
Եզրակացություն
Այս ձեռնարկում մենք ուսումնասիրեցինք PySpark-ի հզորությունը Python-ում մեծ տվյալների հավաքածուներ մշակելու համար: Մենք սկսեցինք ստեղծելով մեր զարգացման միջավայրը և բեռնելով տվյալները RDD-ներում և DataFrames-ներում: Այնուհետև մենք խորացանք տվյալների փոխակերպման և վերլուծության տեխնիկայի մեջ, ներառյալ տվյալների հավաքածուների զտումը, համախմբումը և միացումը: Վերջապես, մենք քննարկեցինք PySpark-ի առաջադեմ տեխնիկան, ինչպիսիք են օգտագործողի կողմից սահմանված գործառույթները, պատուհանի գործառույթները և քեշավորումը: