Big Data Fundamentals with PySpark
Notes from Upendra Devisetty's DataCamp Course
Intro
- Big Data refers to data sets that are too complex for traditional data-processing software
3 Vs of Big Data
- Volume: Size of the data
- Variety: Different sources and formats
- Velocity: Speed of the data
Concepts and Terminology
- Clustered computing: Collection of resources of multiple machines
- Parallel computing: Simultaneous computation
- Distributed computing: Collection of nodes (networked computers) that run in parallel
- Batch processing: Breaking the job into small pieces and running them on individual machines
- Real-time processing: Immediate processing of data
Porcessing systems
- Hadoop/MapReduce: Scalable and fault tolerant framework written in Java
- Open source
- Batch processing
- Apache Spark: General purpose and lightning fast cluster computing system
- Open source
- Both batch and real-time data processing
- Features
- Distributed cluster computing framework
- Efficient in-memory computations for large data sets
- Lightning fast data processing framework
- Provide support for Java, Scala, Python, R, SQL
- Components
- RDD API Apache Spark Core
- Spark SQL
- MLlib Machine Learning
- GraphX
- Spark Streaming
- RDD API Apache Spark Core
- Modes of Deployment
- Local Mode: Single machine such as laptop
- convenient for testing, debugging and demonstration
- Cluster Mode: Set of predefined machines
- good for production
- Workflow: Local -> Cluster
- No code change necessary
- Local Mode: Single machine such as laptop
PySpark
Spark with Python!
Overview
- Apache Spark is written in Scala
- PySpark was designed to support Python with Spark
- Similar computation speed and power as Scala
- PySpark APIs are similar to Pandas and Scikit-learn
Spark shell
- interactive environment for running Spark jobs
- helpful for fast interactive prototyping
- Spark’s shell allow interacting with data on disk or in memory
- 3 different Spark shells:
- Spark-shell for Scala
- PySpark-shell for Python
- SparkR for R
PySpark shell
- PySpark shell is the Python based command line tool
- PySpark shell allows data scientists interface with Spark data structures
- PySpark shell support connecting to a cluster
Understanding SparkContext
- SparkContext is an entry point into the world of Spark
- An entry point is a way of connecting to Spark cluster
- An entry point is like a key to the house
- PySpark has a default SparkContext called
sc
Inspecting SparkContext
sc.version # retrieve SparkContext version
sc.pythonVer # retrieve Python version of SparkContext
sc.master # URL of the cluster or "local" string to run in local mode of SparkContext
Loading data in PySpark
rdd = sc.parallelize([1,2,3,4,5])
rdd2 = sc.textFile("test.txt")
Anonymous functions in Python
- Lambda functions are anonymous functions in Python
- return the functions without any names (i.e. anonymous)
- Quite efficient with
map()
andfilter()
- Lambda functions create functions to be called later
- Inline a function definition or to defer execution of some code
Lambda function syntax
lambda arguments: expression # the general form of a lambda function
double = lambda x: x * 2 # example of a lambda function
print(double(3))
Lambda function in map()
map()
: takes a function and a list; returns a new list which contains items returned by that function for each item
map(function, list) # general syntax of map()
items = [1,2,3,4]
list(map(lambda x: x + 2 , items)) # example of map()
items = [1,2,3,4]
list(map(lambda x, y: x + y , items, items)) # example of map() with two arguments
Lambda function in filter()
filter()
: takes a function and a list; returns a new list for which the function evaluates as true
filter(function, list) # general syntax of filter()
items = [1,2,3,4]
list(filter(lambda x: (x%2 != 0), items)) # example of filter()
[1, 3]
PySpark RDD
RDD
: Resilient Distributed Datasets- Resilient: Ability to withstand failures
- Distributed: Spanning across multiple machines
- Datasets: Collection of partitioned data e.g. Arrays, Tables, Tuples etc.
General Structure
- Data File on disk
- Spark driver creates RDD and distributes amount on Nodes
- Cluster
- Node 1: RDD Partition 1
- Node 2: RDD Partition 2
- Node 3: RDD Partition 3
- …
- Node n: RDD Partition n
Creating RDDs
Options
- Parallelizing an existing collection of objects an existing collection of objects
- From external datasets (for example):
- Files in HDFS
- Objects in Amazon S3 bucket
- lines in a text file
- From existing RDDs
Parallelizing an existing collection of objects
numRDD = sc.parallelize([1,2,3,4]) # parallelize() for creating RDDs from python lists
helloRDD = sc.parallelize("Hello world")
type(helloRDD)
<class 'pyspark.rdd.PipelinedRDD'>
From external datasets
fileRDD = sc.textFile("README.md") # textFile() for creating RDDs from external datasets
type(fileRDD)
<class 'pyspark.rdd.PipelinedRDD'>
Understanding Partitioning in PySpark
- A partition is a logical division of a large distributed data set
numRDD = sc.parallelize(range(10), minPartitions = 6)
fileRDD = sc.textFile("README.md", minPartitions = 6)
getNumPartitions()
: find the number of partitions in an RDD
RDD operations in PySpark
Spark Operations = Transformations + Actions
Transformations
: create new RDDsActions
: perform computations on the RDDs
RDD transformations
Transformation follow lazy evaluation:
Storage
(-> reading data from stable storage)RDD1
(-> transformation)RDD2
(-> transformation)RDD3
(-> action)result
- basic RDD transformations:
map()
,filter()
,flatMap()
,union()
map()
- applies a function to all elements in the RDD
RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)
filter()
- returns a new RDD with only the elements that pass the condition
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x > 2)
flatMap()
- returns multiple values for each element in teh original RDD
RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" ")) # returns ["hello", "world", "how", "are", "you"]
union()
inputRDD = sc.textFile("logs.txt")
errorRDD = inputRDD.filter(lambda x: "error" in x.split())
warningsRDD = inputRDD.filter(lambda x: "warnings" in x.split())
combinedRDD = errorRDD.union(warningsRDD)
RDD actions
- Operation returning a value after running a computation on the RDD
- Basic RDD actions:
collect()
,take(N)
,first()
,count()
collect()
- return all the elements of the dataset as an array
RDD_map.collect()
[1, 4, 9, 16]
take()
- return an array with the first N elements of the dataset
RDD_map.take(2)
[1, 4]
first()
- print the first element of the RDD
RDD_map.first()
[1]
count()
- return the number of elements in the RDD
RDD_flatmap.count()
5
Pair RDDs in PySpark
- Real life datasets are usually key/value pairs
- Each row is a key and maps to one or more values
- Pair RDD: a special data structure to work with this kind of datasets
Pair RDD
: Key is the identifier and value is data
Creating pair RDDs
Two common ways to create pair RDDs
From a list of key-value tuples
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)
From a regular RDD
my_list = ['Sam 23', 'Mary 34', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda x: (x.split(" ")[0], x.split(" ")[1]))
Transformations on pair RDDs
- All regular transformations work on pair RDDs
- Requires functions that operate on key value pairs rather than on individual elements
- Examples of paired RDD transformations:
reduceByKey(func)
: Combine values with the same keygroupByKey()
: Group values with the same keysortByKey()
: Return an RDD sorted by the keyjoin()
: Join two pair RDDs based on their key
reduceByKey()
- Transformation that combines values with the same key
- Runs parallel operations for each key in the dataset
regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34), ("Neymar", 22), ("Messi", 24)])
pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y)
pairRDD_reducebykey.collect()
[('Neymar', 22), ('Ronaldo', 34), ('Messi', 47)]
sortByKey()
- Transformation that orders pair RDD by key
- Returns an RDD sorted by key in ascending or descending order
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))
pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()
[(47, 'Messi'), (34, 'Ronaldo'), (22, 'Neymar')]
groupByKey()
- transformation that groups all the values with the same key in the pair RDD
airports = [("US", "JFK"),("UK", "LHR"),("FR", "CDG"),("US", "SFO")]
regularRDD = sc.parallelize(airports)
pairRDD_group = regularRDD.groupByKey().collect()
for cont, air in pairRDD_group:
print(cont, list(air))
FR ['CDG']
US ['JFK', 'SFO']
UK ['LHR']
join()
- transformation that joins the two paor RDDs based on their key
RDD1 = sc.parallelize([("Messi", 34), ("Ronaldo", 32), ("Neymar", 24)])
RDD2 = sc.parallelize([("Ronaldo", 80),("Neymar", 120),("Messi", 100)])
RDD1.join(RDD2).collect()
[('Neymar', (24, 120)), ('Ronaldo', (32, 80)), ('Messi', (34, 100))]
More actions
Regular RDD
reduce()
reduce()
: aggregate the elements of a regular RDD- should be commutative (ie. changing the order of the operands does not change the result) and associative
x = [1,3,4,6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y: x + y)
14
saveAsTextFile()
saveAsTextFile()
: saves RDD into a text file inside a directory with each partition as a separate file
RDD.saveAsTextFile("tempFile")
coalesce()
: can be used to save RDD as a single text file
RDD.coalesce(1).saveAsTextFile("tempFile")
Pair RDD
countByKey()
countByKey
: counts the number of elements for each key- only available for type(K, V)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
for kee, val in rdd.countByKey().items():
print(kee, val)
('a', 2)
('b', 1)
collectAsMap()
collectAsMap
: return the key value pairs in the RDD as a dictionary
sc.parallelize([(1, 2), (3, 4)]).collectAsMap() {1: 2, 3: 4}
{1: 2, 3: 4}
PySpark DataFrame
PySpark SQL
: Spark library for structured data.PySpark DataFrame
: an immutable distributed collection of data with named columns- designed for processing both structured and semi-structured data
- DataFrame API is available in Python, R, Scala, and Java
- DataFrames in PySpark support both SQL queries (
SELECT * FROM table
) or expression methods (df.select()
)
SparkSession
SparkContext
is the main entry point for creating RDDsSparkSession
provides a single point of entry to interact with Spark DataFrames- used to create DataFrame, register DataFrames, execute SQL queries
- available in PySpark shell as
spark
Creating DataFrames
- Two different methods of creating DataFrames in PySpark
- From existing RDD using Sparksession’s
createDataFrame()
method - From various data sources (CSV, JSON, TXT) using Sparksession’s
read()
method
- From existing RDD using Sparksession’s
Schema
- controls the data and helps DataFrames to optimize queries
- provides information about column name, type of data in the column, empty values, etc.
From RDD
iphones_RDD = sc.parallelize([
("XS", 2018, 5.65, 2.79, 6.24),
("XR", 2018, 5.94, 2.98, 6.84),
("X10", 2017, 5.65, 2.79, 6.13),
("8Plus", 2017, 6.23, 3.07, 7.12)
])
names = ['Model', 'Year', 'Height', 'Width', 'Weight']
iphones_df = spark.createDataFrame(iphones_RDD, schema=names) type(iphones_df)
pyspark.sql.dataframe.DataFrame
From reading CSV/JSON/TXT
df_csv = spark.read.csv("people.csv", header=True, inferSchema=True)
df_json = spark.read.json("people.json", header=True, inferSchema=True) df_txt = spark.read.txt("people.txt", header=True, inferSchema=True)
DataFrame Operators in Pyspark
- DataFrame operations: Transformations and Actions
- DataFrame Tranformations:
select()
,filter()
,groupby()
,orderby()
,dropDuplicates()
, andwithColumnRenamed()
- DataFrame Actions:
head()
,show()
,count()
, anddescribe()
- General methods for Spark datsets/dataframes:
printSchema()
Transformations
select()
select()
subsets the columns in the DataFrame
df_id_age = test.select('Age')
show()
show()
prints first 20 rows in the DataFrame
df_id_age.show(3) # shows top 3 rows
filter()
filter()
filters out the rows based on a condition
new_df_age21 = new_df.filter(new_df.Age > 21)
groupby() and count()
groupby()
can be used to group a variable
test_df_age_group = test_df.groupby('Age')
test_df_age_group.count().show(3)
orderBy()
orderBy()
sorts the DataFrame based on one or more columns
test_df_age_group.count().orderBy('Age').show(3)
dropDuplicates()
dropDuplicates()
removes the duplicate rows of a DataFrame
test_df_no_dup = test_df.select('User_ID','Gender', 'Age').dropDuplicates()
test_df_no_dup.count()
5892
withColumnRenamed()
withColumnRenamed()
renames a column in the DataFrame
test_df_sex = test_df.withColumnRenamed('Gender','Sex')
test_df_sex.show()
+-------+---+---+
|User_ID|Sex|Age|
+-------+---+---+
|1000001| F| 17|
|1000001| F| 17|
|1000001| F| 17|
+-------+---+---+
Actions
columns
columns
prints the columns of a DataFrame
test_df.columns
describe()
describe()
computes summary statistics of numerical columns in the DataFrame
test_df.describe().show()
printSchema()
printSchema()
prints the types of columns in the DataFrame
test_df.printSchema()
SparkSQL
SparkSQL
allows for use of SQL via DataFrame APIDataFrame API
provides a programmatic domain-specific language (DSL) for data- DataFrame transformations and actions are easier to construct programmatically
- The operations on DataFrames can also be done using SQL queries
SQL queries
sql()
: Sparksession method to execute SQL queries- takes SQL statement as an argument and returns the result as DataFrame
df.createOrReplaceTempView("table1")
df2 = spark.sql("SELECT field1, field2 FROM table1")
df2.collect()
[Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
Extract data
test_df.createOrReplaceTempView("test_table")
query = '''SELECT Product_ID FROM test_table'''
test_product_df = spark.sql(query)
test_product_df.show(5)
Summarizing and grouping data
test_df.createOrReplaceTempView("test_table")
query = '''SELECT Age, max(Purchase) FROM test_table GROUP BY Age'''
spark.sql(query).show(5)
Filtering columns
test_df.createOrReplaceTempView("test_table")
query = '''SELECT Age, Purchase, Gender FROM test_table WHERE Purchase > 20000 AND Gender == "F"'''
spark.sql(query).show(5)
Data Visualization in PySpark
- Open source plotting tools to aid visualization in Python: Matplotlib, Seaborn, Bokeh, Plotly, …
- Plotting graphs using PySpark DataFrames is done using three methods:
pyspark_dist_explore
library,toPandas()
, andHandySpark
library
pyspark_dist_explore
pyspark_dist_explore
provides quick insights into DataFrames- Currenty three functions available -
hist()
,distplot()
,pandas_histogram()
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)
test_df_age = test_df.select('Age')
hist(test_df_age, bins=20, color="red")
Plotting with Pandas
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)
test_df_sample_pandas = test_df.toPandas()
test_df_sample_pandas.hist('Age')
Pandas DataFrame | PySpark DataFrame |
---|---|
in-memory, single-server based structures | in parallel on multiple nodes |
eager evaluation | lazy evaluation |
mutable | immutable |
supports more operations | supports fewer operations |
HandySpark
HandySpark
: package designed to improve PySpark user experience
test_df = spark.read.csv('test.csv', header=True, inferSchema=True)
hdf = test_df.toHandy()
hdf.cols["Age"].hist()