ignite Spark tables

ignite Spark tables

Distributed Warehouse Analysis… Go!


Share:          
  1. Create a Spark Session
  2. Name of the Spark application instance
  3. Driver TCP port
  4. Number of Join and DataFrame Partitions
    1. Partition SQL Table by Column values
  5. Create Spark DataFrames and (SQL-like) Tables
    1. Convert Pandas DF to Spark DF
    2. Create Temporary Spark DataFrame from CSV
    3. Create DataFrame with Schema
    4. DataType overview
    5. View DataFrame schema
    6. Save DataFrame to File
  6. DataFrames and SQL-like Tables
    1. Register Spark DataFrame in SQL Table Catalog
    2. SQL query against Spark table
    3. Query SQL Table and save to Pandas DF
    4. Describe an SQL Table
    5. List SQL Table Catalog
  7. Column and Value Manipulation
    1. Remove Missing Values in a Column with .fillna()
    2. Categorization/binning of Column Values .withColumn() and .when()
    3. Creating new Columns / Computation with .withColumn()
    4. Select Columns with .select()
    5. Drop Columns with .drop()
    6. Select Columns and use SQL statements with .selectExpr()
    7. Rename Columns I: .alias()
    8. Rename Columns II: .withColumnRenamed()
    9. Aggregation with .groupBy() and .min() .max() .avg()
    10. Keep specified values rows with .filter()
  8. Special Functions F
    1. Add an ID column
    2. User-Defined Functions udf()
    3. If/When statements with inline methods
  9. Caching
    1. Caching DataFrames with .cache()
    2. Checking and Removing DataFrames from Cache
  10. Broadcasting in Joins
  11. Machine Learning
    1. Data Set splitting
    2. Training Logistic Regression
    3. Model Evaluation
    4. Predicting
  12. Pipelines
    1. Casting DataFrame columns
    2. Joining Columns .join()
    3. One-Hot-Encoding (StringIndexer, OneHotEncoder)
    4. VectorAssembler
    5. ML Pipeline stages with Pipeline()
      1. Fit and Transform Pipeline
    6. Train-Test Split

PySpark can do Pandas-like DataFrame operations, transformation, and ML modeling.

The advantage of Spark is that the instructions get sent out to a cluster that divides the DataFrames into segments, processes them in parallel, and then merges the result back together.

A Spark cluster (made up of nodes) can have much more memory (RAM) than a single machine, which enables processing of vastly larger data sets.

Spark DataFrames are data objects that can be partitioned for submitting to the differnt nodes of a Spark cluster.

The number of partitions can be set when creating a DataFrame.

Spark uses ‘Lazy Processing’. This means:

  • Entering lazy commands is akin to writing a script that gets executed later.
  • Updates to the instructions before processing makes for most efficient planning.

A good start for PySpark is delivered here in the official “Getting Started” guide.

Create a Spark Session

The SparkContext is the connection to the cluster. The SparkSession is the interface with that connection.

from pyspark.sql import SparkSession

# Create spark session, avoid building a second session
my_spark_session = SparkSession.builder.getOrCreate()

Name of the Spark application instance

app_name = spark.conf.get('spark.app.name')
print("Name: %s" % app_name)

Name: pyspark-shell

Driver TCP port

The cluster consists of one driver node and one or more worker nodes.

driver_tcp_port = spark.conf.get('spark.driver.port')
print("Driver TCP port: %s" % driver_tcp_port)

Driver TCP port: 41309

Number of Join and DataFrame Partitions

Default number of partitions is 200.

# Get settings, number of join partitions
spark.conf.get('spark.sql.shuffle.partitions')

# Get numbers of partitions of DataFrame
num_partitions_bef = workers_df.rdd.getNumPartitions()
print("Number of DF partitions before: %s" % num_partitions_bef)

# Change settings, number of join partitions
spark.conf.set('spark.sql.shuffle.partitions', 400)

# Reload DataFrame from file
workers_df = spark.read.csv('departures.txt.gz').distinct()

# Get numbers of partitions of DF again
num_partitions_aft = workers_df.rdd.getNumPartitions()
print("Number of DF partitions after: %s" % num_partitions_aft)

Number of DF partitions before: 200

Number of DF partitions after: 400

Partition SQL Table by Column values

Partition table by ‘part’ column values:

query = """
SELECT
    part,
    LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
    LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
    word AS w3,
    LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,
    LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5
FROM 
    text
"""
spark.sql(query).where("part = 12").show(10)

The ‘chapter’ column has 12 integer categories.

Repartition DataFrame into 12 partitions on ‘chapter’ column.

repart_df = text_df.repartition(12,'chapter')

# Prove that repart_df has 12 partitions
repart_df.rdd.getNumPartitions()

12

Create Spark DataFrames and (SQL-like) Tables

All DataFrames are temporary at this point.

Convert Pandas DF to Spark DF

# Create a pandas DF with random values
df_temp = pd.DataFrame(np.random.random(10))

# Feed the pandas DF into spark DF (SparkSession object is called "spark")
spark_temp = spark.createDataFrame(df_temp)

# The permanent Table catalog will be empty, the DF is only temporary
print(spark.catalog.listTables())

Create Temporary Spark DataFrame from CSV

Shortcut version

file_path = "/usr/local/share/datasets/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)

# Show DataFrame
airports.show()

Long version with extra options

# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

Create DataFrame with Schema

When reading without schema, the data type gets inferred and may be incorrect, and it takes more time to read. For optimal speed, create schema.

Pass options to remove invalid rows: .options(mode=’DROPMALFORMED’)

from pyspark.sql.types import *

# Define a new schema using the StructType method
schema = StructType([StructField("brand", StringType(), nullable=False),
	StructField("model", StringType(), nullable=False),
	StructField("absorption_rate", ByteType(), nullable=True),
	StructField("comfort", ByteType(), nullable=True)
])
df = spark.read.options(header="true", mode=DROPMALFORMED).schema(schema).csv("/home/repl/workspace/mnt/lake/inbound/ratings.csv")

pprint(df.dtypes)

[(‘brand’, ‘string’), (‘model’, ‘string’), (‘absorption_rate’, ‘tinyint’), (‘comfort’, ‘tinyint’)]

from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
	# Define a StructField for each field
	StructField('name', StringType(), nullable=False),
	StructField('age', IntegerType(), nullable=False),
	StructField('city',StringType(),nullable=False)
])

DataType overview

DataTypeValue Type in Python
ByteType()Numbers -128 to 127
ShortType()Numbers -32768 to 32767
IntegerType()Numbers -2147483648 to 2147483647
FloatType()Floating point number
StringType()String
BooleanType()bool (True, False, 0, 1)
DateType()datetime.date

View DataFrame schema

Show the columns in a DataFrame and their data types.

split_df.printSchema()

Save DataFrame to File

Shortcut version

df.write.parquet('filename.parquet')

Long version with extra options

df.write.format('parquet').save('filename.paruqet')

DataFrames and SQL-like Tables

Register Spark DataFrame in SQL Table Catalog

After registering the table, we can run .sql() methods with queries against it.

# Add spark_temp to the catalog
temp_df.createOrReplaceTempView("my_sqltable")

SQL query against Spark table

# write a query string
query = "FROM flights SELECT * LIMIT 10"

# apply query string with .sql() method to the session object
flights10 = my_spark_session.sql(query)

# print results table in console
flights10.show()
yearmonthdaydep_timedep_delayarr_timearr_delaycarriertailnumflightorigindestair_timedistancehourminute
2014128658-7935-5VXN846VA1780SEALAX132954658
20141221040515055ASN559AS851SEAHNL36026771040
2014391443-216522VXN847VA755SEASFO1116791443

Query SQL Table and save to Pandas DF

# write a query string
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# apply query string with .sql() method to the session object
flight_counts = spark.sql(query)

# convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# print results pandas DF
pd_counts.head()
long_films_df = spark.sql('SELECT * FROM my_sqltable WHERE filmduration >100')

Load file to DataFrame, register table, then query the table.

# Read the Parquet file into films_df
films_df = spark.read.parquet('films.parquet')

# Register the temp table
films_df.createOrReplaceTempView('films')

# Run a SQL query of the average film duration
avg_duration = spark.sql('SELECT AVG(film_duration) from films').collect()[0]
print('The average film time is: %d' % avg_duration)

The average film time is: 88

Describe an SQL Table

spark.sql("DESCRIBE schedule").show()
col_namedata_typecomment
train_idstringnull
stationstringnull
timestringnull

List SQL Table Catalog

print(spark.catalog.listTables())

[Table(name=’my_sqltable’, database=None, description=None, tableType=’TEMPORARY’, isTemporary=True)]

Column and Value Manipulation

Remove Missing Values in a Column with .fillna()

All null values in column ‘comfort’ will be filled with default value “4”.

ratings = ratings.fillna(4, subset=["comfort"])

Categorization/binning of Column Values .withColumn() and .when()

from pyspark.sql.functions import col, when

categorized_ratings = ratings.withColumn("comfort", when(ratings.comfort > 3, "sufficient").otherwise("insufficient"))

categorized_ratings.show()

Creating new Columns / Computation with .withColumn()

Spark DataFrames are immutable. We need to overwrite the old DF. That means creating new column based on old column

# Create the DataFrame flights
flights = spark.table("flights")

# Show prints 20 rows
flights.show()

# Add new column ‘duration_hrs’ using method .withColumn()
flights = flights.withColumn("duration_hrs", flights.air_time / 60)

Select Columns with .select()

Keep specified columns only.

# string style
selected1 = flights.select("tailnum","origin","dest")

# boolean style
selected2 = flights.select(flights.origin, flights.dest, flights.carrier)

Drop Columns with .drop()

flights.drop("superfluous_column")

Select Columns and use SQL statements with .selectExpr()

The SQL expression is passed as string.

speed_df = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

Rename Columns I: .alias()

flights = flights.select((flights.air_time/60).alias("duration_hrs"))

Rename Columns II: .withColumnRenamed()

Overwrite original DF. ‘worked_df.withColumnRenamed(“old_name”, “new_name”)’

airports = airports.withColumnRenamed("faa", "dest")

Aggregation with .groupBy() and .min() .max() .avg()

Group by month and destination, calculate average departure delay of the groups.

flights.groupBy("month", "dest").avg(dep_delay).show()
monthdestavg(dep_delay)
11TUS-2.3333333333333335
11ANC7.529411764705882
1BUR-1.45
1PDX-5.6923076923076925
6SBA-2.5

Keep specified values rows with .filter()

String and Boolean column both produce the same results. Similar to SQL’s ‘WHERE’ clause.

# passing a string (weird)
long_flights1 = flights.filter("distance > 1000")
long_flights1.show()

# passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)
long_flights2.show()

# remove missing (Null, NaN) values in multiple columns
long_flights3 = long_flights.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

# filter for strings starting with specified letter (regex-like)
long_flights4 = long_flights.filter(long_flights.airline.like('A%'))

Filter for flights with origin PDX and select minimum distance

flights.filter(flights.origin == "PDX").groupBy().min('distance').show()
min(distance)
106

Filter for all flights with origin SEA and select maximum air time

flights.filter(flights.origin == "SEA").groupBy().max('air_time').show()
max(air_time)
409

Filter carriers for Delta (DL) and origin SEA then calculate average air time (flight duration)

flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()
avg(air_time)
188.20689655172413

Duration hours (divide by 60 mins) and sum up entire column

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("air_time").show()
sum(air_time)
1517376

Aggregate three columns of the purchased df and alias two of them.

from pyspark.sql.functions import col, avg, stddev_samp, max as sfmax

aggregated_df = (purchased.groupBy(col('Country'))
 .agg(avg('Salary').alias('average_salary'),
         stddev_samp('Salary'),
         sfmax('Salary').alias('highest_salary')
              		         )
 )

aggregated.show()
Countryaverage_salarystddev_samp(Salary)highest_salary
Germany63000.0NaN63000
France48000.0NaN48000
Spain62000.012727.92206135785571000

Special Functions F

String to upper case

import pyspark.sql.functions as F

film_df.withColumn('upper', F.upper('name')

Split Strings

import pyspark.sql.functions as F

film_df.withColumn('split_string', F.split('film_name', ' '))

.agg() and F.stddev

# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()
monthdeststddev_samp(dep_delay)
11TUS3.0550504633038935
11ANC18.604716401245316
1BUR15.22627576540667

Add an ID column

The are IDs generated as Int64, contain gaps (non sequential), but retain parallelization across a cluster.

Important Note: DataFrames with many partitions will get much more more digits in the IDs.

import pyspark.sql.functions as F

worker_df = worker_df.withColumn('ROW_ID', F.monotonically_increasing_id())

User-Defined Functions udf()

We can wrap one of your own Python functions into a udf() and apply it to a Spark DF’s column.

import pyspark.sql.functions as F
import pyspark.sql.types

# Define a python function
def reverseName(name):
  return name[::-1]

# Store function as a UDF -> python function and data type definition
udfReverseName = F.udf(reverseName, StringType())

# Create a new column using the UDF
workerr_df = worker_df.withColumn('worker_name_reversed', udfReverseName(worker_df.worker_name))

worker_df.show()
datejob_titleworker_nameworker_name_reversed
02/08/2019engineerJen M. GatessetaG .M neJ
02/08/2019engineerPippa F. KingstonnotsgniK .F appiP
02/08/2019directorMike S. RawlingssgnilwaR .S ekiM

If/When statements with inline methods

Add a column with random numbers for workers with the title engineer using .when()

worker_df = worker_df.withColumn('random_val',
				when(worker_df.job_title=='engineer', F.rand())
				)

worker_df.show()
datejob_titleworker_namerandom_val
02/08/2019engineerJen M. Gates0.4420037036681319
02/08/2019engineerPippa F. Kingston0.9169317912957646
02/08/2019directorMike S. Rawlingsnull

Add a column with multiple chained .when() clauses. The role of ‘else’ in Python is taken by .otherwise().

worker_df = worker_df.withColumn('random_val',
				when(voter_df.TITLE == 'engineer', F.rand())
				.when(voter_df.TITLE == 'director', 2)
				.otherwise(0)
				)

worker_df.show()
datejob_titleworker_namerandom_val
02/08/2019engineerJen M. Gates0.0473898980592975
02/08/2019engineerPippa F. Kingston0.1810667508308076
02/08/2019directorMike S. Rawlings2.0
02/08/2019engineerAlvaro Verano0.4672294882395198

Caching

Caching DataFrames with .cache()

Creating local caches for DataFrames that get called repeatedly can help improving speed.

# Set start time
start_time1 = time.time()

# Add caching to df
workers_df = workers_df.distinct().cache()

# Print how long counting took when reading first time. This time without pre-cached material takes longer.
print("Counting %d rows took %f seconds" % (workers_df.count(), time.time() - start_time1))

# Do it again, this time it will be faster
start_time2 = time.time()
print("Counting %d rows again took %f seconds" % (workerss_df.count(), time.time() - start_time2))

Counting 319532 rows took 2.255696 seconds

Counting 319532 rows again took 0.404590 seconds

Checking and Removing DataFrames from Cache

# Check status
print("Is workers_df cached?: %s" % workers_df.is_cached)

# Remove caching
workers_df.unpersist()

# Check status again
print("Is workers_df cached?: %s" % workers_df.is_cached)

Is workers_df cached?: True \ Is workers_df cached?: False

Broadcasting in Joins

Broadcasting distributed the entire dataframe that is selected to all workers. This can expedite up join commands.

from pyspark.sql.functions import broadcast

# Join the trains_rides_df and stations_df DataFrames using broadcasting
broadcast_df = train_rides_df.join(broadcast(stations_df), \
    train_rides_df["Arrival Station"] == stations_df["Station_Name"] )

# Show the query plan and compare against the original
broadcast_df.explain()

Machine Learning

Data Set splitting

# Data splitting in 0.8 train, 0.2 test
df_trainset, df_testset = df_examples.randomSplit((0.8, 0.2), 42)

# Number of train samples
print("Number training: ", df_trainset.count())

# Number of test samples
print("Number test: ", df_testset.count())

Number training: 4602 Number test: 1128

Training Logistic Regression

from pyspark.ml.classification import LogisticRegression

# Create a Logistic Regression object with settings
logistic = LogisticRegression(maxIter=100, regParam=0.4, elasticNetParam=0.0)

# Train (fit) on the train set to create a model
model = logistic.fit(df_trainset)

# How many iterations were needed
print("Training iterations: ", model.summary.totalIterations)

Training iteration: 25

Model Evaluation

Return a summary object.

model_statistics = model.evaluate(df_testset)

print(model_statistics.areaUnderROC)

0.8783

Predicting

The prediction will create a ‘prediction’ column anda a ‘probability’ column (probability false pred, probability true pred).

By default, the probability threshold is 0.5 to make a prediction.

# Predict the test set, returns a DataFrame
prediction = model.transform(df_testset)

Pipelines

Creating a pipeline can simplify and formalize preprocessing of training data and the ML model training. Using them avoids mistakes in repetition.

https://spark.apache.org/docs/3.1.2/ml-pipeline.html#pipeline-components

Create a pipeline from a workflow such as this:

departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# get column id from printSchema()
departures_df.printSchema()

# Remove any duration of 0 -> where the column has null rows
departures_df = departures_df.drop(departures_df[departures_df.columns[3]].isNull())

# Add an ID column
departures_df = departures_df.withColumn('ID', F.monotonically_increasing_id())

# Write the file
departures_df.write.json('output_file.json', mode='overwrite')

Casting DataFrame columns

  • ML modeling requires exclusively numeric input.
  • All string columns need to be cast to integer or double.
  • Linear data has to be categorized.
spark_df = spark_df.withColumn(col_name, spark_df.col_name.cast(integer))

‘double’ = double precision float ‘integer’ = integer

Joining Columns .join()

first_df.join(second_df, on=key, how=leftinner)'

One-Hot-Encoding (StringIndexer, OneHotEncoder)

StringIndexer() takes a column’s strings and converts it to categorical integers ‘1’, ‘2’, ‘3’, ‘4’….

from pyspark.ml.feature import StringIndexer

carr_indexer = StringIndexer(inputCol="Carrier", outputCol="carrier_index")

https://spark.apache.org/docs/latest/ml-features#stringindexer

OneHotEncoder() takes a categorical numbers and expands it into category columns and ‘1’ / ‘0’ categories.

from pyspark.ml.feature import OneHotEncoder

carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

https://spark.apache.org/docs/latest/ml-features#onehotencoder

VectorAssembler

Before running ML training, all feature columns need to be converted to a single vector column.

from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"],
				outputCol="features"
				)

https://spark.apache.org/docs/latest/ml-features#vectorassembler

ML Pipeline stages with Pipeline()

The Pipeline “stages” argument holds a list of all the stages the data goes through. All these stage objects have to be defined beforehand and now are brought in order.

from pyspark.ml import Pipeline

flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

https://spark.apache.org/docs/latest/ml-features#vectorassembler

Fit and Transform Pipeline

The pipeline now gets applied with fit and transform. The DataFrame gets passed as argument to both.

piped_data = flights_pipe.fit(model_data).transform(model_data)

Train-Test Split

IMPORTANT: split after the StringIndexer ran, otherwise the train and test sets may end up with different codes for the same category. Train: 60%, Test: 40%

training, test = piped_data.randomSplit([.6, .4])



© 2023. All rights reserved. Hosted on GitHub, made with https://hydejack.com/