# Spark
# Data analysis with python and pyspark
# Installation
On mac, just do brew install apache-spark
. Link
# Getting started
To run the pyspark CLI, just do in the termial
pyspark
Create a new spark session:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("foo").getOrCreate()
Set verbose level
spark.sparkcontext.setLogLevel("FATAL")
# PySpark Read and Process Data
There are two ways to read data into spark, RDD (Resilient distributed dataset) and Data Frame.
book = spark.read.text("foo.txt")
book
# DataFrame[value: string]
The book contains the datafram for a table which just has one column.
value: string |
---|
The |
null |
This book ... |
Each row in the txt file is a row in the table.
To show the schema
book.printSchema()
To show the first few (<=20) rows
book.show()
Changing sentence to a list of words
from pyspark.sql.functions import split
lines = book.select(split(book.value, " ").alias("line")) # line will be the new column name.
lines.show()
Select a specific column
book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")
Rename a column
lines = book.select(book.value).alias("line")
# Or
lines = book.select(split(book.value, " "))
lines = lines.withColumnRenamed("split(value, ,-1)", "line")
Explode column of arrays into rows of elements
from pyspark.sql.functions import explode, col
words = lines.select(explode(col("line")).alias("word"))
words.show()
Convert all to lower case and remove punctuations
words_lower = words.select(lower(col("word")).alias("word_lower"))
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))
words_nonull = words_clean.filter(col("words") != "") # this can also be done with where(), filter() == where()
Groupby and count
results = words_nonull.groupby(col("word")).count()
results.orderBy("count", ascending=False)
# coalesce(1) merges into only one partition.
results.coalesce(1).write.csv('./countresult.csv')
Simplify by chaining
import pyspark.sql.functions as F
results = (
spark.read.text("foo.txt")
.select(F.split(...))
.select(F.explode(...))
.select(F.lower(...))
.select()
.where()
.groupby()
.count()
)
Run in batch mode:
spark-submit my_program.py
# Tabular data
Creating Tabular data
my_list = [
["Banana", 2, 1.99],
["Apple", 4, 2.99],
]
df = spark.createDataFrame(
my_list, ["item", "quantity", "price"]
)
To do Exploratory data analysis (EDA), we can use toPandas()
.
# Spark GraphX
# Quick start
To get the data, download from this link
Run the console.
> spark-shell
Load data into spark
import org.apache.spark.graphx._
val graph = GraphLoader.edgeListFile(sc, "cit-HepTh.txt")
graph.inDegrees.reduct((a,b) => if (a._2 > b._2) a else b)
# Vertices and edges
Build a graph
import org.apache.spark.graphx._
val myVertices = sc.makeRDD(Array((1L, "Ann"), (2L, "Bill"),
(3L, "Charles"), (4L, "Diane"), (5L, "Went to gym this morning")))
val myEdges = sc.makeRDD(Array(Edge(1L, 2L, "is-friends-with"),
Edge(2L, 3L, "is-friends-with"), Edge(3L, 4L, "is-friends-with"),
Edge(4L, 5L, "Likes-status"), Edge(3L, 5L, "Wrote-status")))
val myGraph = Graph(myVertices, myEdges)
myGraph.vertices.collect
myGraph.edges.collect
// show the edges with expanded nodes
myGraph.triplets.collect
Triplets keys
Field | Description |
---|---|
attr | edge attribute |
srcId | source node vertex ID |
srcAttr | source node attribute |
dstId | destination node vertex ID |
dstAttr | destination node attribute |
Simple triplets mapping.
// map to another triplets of ((expanded src), (expanded dst), (edgeAttr, boolean))
myGraph.mapTriplets(t => (t.attr, t.attr=="is-friends-with" &&
t.srcAttr.toLowerCase.contains("a"))).triplets.collect
res4: Array[org.apache.spark.graphx.EdgeTriplet[String,(String, Boolean)]]
= Array(((1,Ann),(2,Bill),(is-friends-with,true)),
((2,Bill),(3,Charles),(is-friends-with,false)),
((3,Charles),(4,Diane),(is-friends-with,true)),
((3,Charles),(5,Went to gym this morning),(Wrote-status,false)),
((4,Diane),(5,Went to gym this morning),(Likes-status,false)))
Map Reduce to calculate outdegrees of each node.
myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1),
(3,2), (2,1))
The signature of aggregateMessage:
def aggregateMessages[Msg](
sendMsg: EdgeContext[VD, ED, Msg] => Unit, // Unit == void
mergeMsg: (Msg, Msg) => Msg)
: VertexRDD[Msg]
sendMsg
takes EdgeContext as parameter and returns nothing. EdgeContext
has two methods for message sending: sendTosrc
and sendToDst
.
Join with vertex attribute
myGraph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).join(myGraph.vertices).collect
res6: Array[(org.apache.spark.graphx.VertexId, (Int, String))] =
Array((4,(1,Diane)), (1,(1,Ann)), (3,(2,Charles)), (2,(1,Bill))
Remove vertex IDs and swap the second tuple.
myGraph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).join(myGraph.vertices).map(_._2.swap).collect
res7: Array[(String, Int)] = Array((Diane,1), (Ann,1), (Charles,2),
(Bill,1))
Get the missing vertex back. The missing vetex is who has never received any message.
myGraph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).rightOuterJoin(myGraph.vertices).map(_._2.swap).collect
res8: Array[(String, Option[Int])] = Array((Diane,Some(1)), (Ann,Some(1)),
(Charles,Some(2)), (Went to gym this morning,None), (Bill,Some(1)))
Default None
to 0.
myGraph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).rightOuterJoin(myGraph.vertices).map(
x => (x._2._2, x._2._1.getOrElse(0))).collect
res9: Array[(String, Int)] = Array((Diane,1), (Ann,1), (Charles,2), (Went to
gym this morning,0), (Bill,1))
# Serialization / deserialization
myGraph.vertices.saveAsObjectFile("myGraphVertices")
myGraph.edges.saveAsObjectFile("myGraphEdges")
val myGraph2 = Graph(
sc.objectFile[Tuple2[VertexId,String]]("myGraphVertices"),
sc.objectFile[Edge[String]]("myGraphEdges")
)
This will save the vertices and edges to different files, each saves a partition. If you want to save it to a single file.
myGraph.vertices.coalesce(1,true).saveAsObjectFile("myGraphVertices")
# Algorithms
# Shortest paths to 3
lib.ShortestPaths.run(myGraph,Array(3)).vertices.collect
# Connected components
val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
Edge(4L,5L,""), Edge(6L,7L,"")))).cache
g.connectedComponents.vertices.map(_.swap).groupByKey.map(_._2).collect