# Spark

# Data analysis with python and pyspark

GitHub repo

# Installation

On mac, just do brew install apache-spark. Link

# Getting started

To run the pyspark CLI, just do in the termial

pyspark

Create a new spark session:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("foo").getOrCreate()

Set verbose level

spark.sparkcontext.setLogLevel("FATAL")

# PySpark Read and Process Data

There are two ways to read data into spark, RDD (Resilient distributed dataset) and Data Frame.

book = spark.read.text("foo.txt")

book
# DataFrame[value: string]

The book contains the datafram for a table which just has one column.

value: string
The
null
This book ...

Each row in the txt file is a row in the table.

To show the schema

book.printSchema()

To show the first few (<=20) rows

book.show()

Changing sentence to a list of words

from pyspark.sql.functions import split

lines = book.select(split(book.value, " ").alias("line")) # line will be the new column name.

lines.show()

Select a specific column

book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")

Rename a column

lines = book.select(book.value).alias("line")

# Or

lines = book.select(split(book.value, " "))
lines = lines.withColumnRenamed("split(value, ,-1)", "line")

Explode column of arrays into rows of elements


from pyspark.sql.functions import explode, col

words = lines.select(explode(col("line")).alias("word"))
words.show()

Convert all to lower case and remove punctuations

words_lower = words.select(lower(col("word")).alias("word_lower"))

words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))

words_nonull = words_clean.filter(col("words") != "") # this can also be done with where(), filter() == where()

Groupby and count

results = words_nonull.groupby(col("word")).count()

results.orderBy("count", ascending=False)

# coalesce(1) merges into only one partition.
results.coalesce(1).write.csv('./countresult.csv')

Simplify by chaining

import pyspark.sql.functions as F

results = (
    spark.read.text("foo.txt")
    .select(F.split(...))
    .select(F.explode(...))
    .select(F.lower(...))
    .select()
    .where()
    .groupby()
    .count()
)

Run in batch mode:

spark-submit my_program.py

# Tabular data

Creating Tabular data

my_list = [
    ["Banana", 2, 1.99],
    ["Apple", 4, 2.99],
]

df = spark.createDataFrame(
    my_list, ["item", "quantity", "price"]
)

To do Exploratory data analysis (EDA), we can use toPandas().

# Spark GraphX

# Quick start

To get the data, download from this link

Run the console.

> spark-shell

Load data into spark

import org.apache.spark.graphx._

val graph = GraphLoader.edgeListFile(sc, "cit-HepTh.txt")
graph.inDegrees.reduct((a,b) => if (a._2 > b._2) a else b)

# Vertices and edges

Build a graph

import org.apache.spark.graphx._

val myVertices = sc.makeRDD(Array((1L, "Ann"), (2L, "Bill"),
 (3L, "Charles"), (4L, "Diane"), (5L, "Went to gym this morning")))

val myEdges = sc.makeRDD(Array(Edge(1L, 2L, "is-friends-with"),
 Edge(2L, 3L, "is-friends-with"), Edge(3L, 4L, "is-friends-with"),
 Edge(4L, 5L, "Likes-status"), Edge(3L, 5L, "Wrote-status")))

val myGraph = Graph(myVertices, myEdges)

myGraph.vertices.collect
myGraph.edges.collect

// show the edges with expanded nodes
myGraph.triplets.collect

Triplets keys

Field Description
attr edge attribute
srcId source node vertex ID
srcAttr source node attribute
dstId destination node vertex ID
dstAttr destination node attribute

Simple triplets mapping.

// map to another triplets of ((expanded src), (expanded dst), (edgeAttr, boolean))


myGraph.mapTriplets(t => (t.attr, t.attr=="is-friends-with" &&
 t.srcAttr.toLowerCase.contains("a"))).triplets.collect

res4: Array[org.apache.spark.graphx.EdgeTriplet[String,(String, Boolean)]]
 = Array(((1,Ann),(2,Bill),(is-friends-with,true)),
((2,Bill),(3,Charles),(is-friends-with,false)),
((3,Charles),(4,Diane),(is-friends-with,true)),
((3,Charles),(5,Went to gym this morning),(Wrote-status,false)),
((4,Diane),(5,Went to gym this morning),(Likes-status,false)))

Map Reduce to calculate outdegrees of each node.

myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect

res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1),
(3,2), (2,1))

The signature of aggregateMessage:

def aggregateMessages[Msg](
    sendMsg: EdgeContext[VD, ED, Msg] => Unit,  // Unit == void
    mergeMsg: (Msg, Msg) => Msg)
  : VertexRDD[Msg]

sendMsg takes EdgeContext as parameter and returns nothing. EdgeContext has two methods for message sending: sendTosrc and sendToDst.

Join with vertex attribute

myGraph.aggregateMessages[Int](_.sendToSrc(1),
 _ + _).join(myGraph.vertices).collect

res6: Array[(org.apache.spark.graphx.VertexId, (Int, String))] =
Array((4,(1,Diane)), (1,(1,Ann)), (3,(2,Charles)), (2,(1,Bill))

Remove vertex IDs and swap the second tuple.

myGraph.aggregateMessages[Int](_.sendToSrc(1),
 _ + _).join(myGraph.vertices).map(_._2.swap).collect

res7: Array[(String, Int)] = Array((Diane,1), (Ann,1), (Charles,2),
(Bill,1))

Get the missing vertex back. The missing vetex is who has never received any message.

myGraph.aggregateMessages[Int](_.sendToSrc(1),
 _ + _).rightOuterJoin(myGraph.vertices).map(_._2.swap).collect

res8: Array[(String, Option[Int])] = Array((Diane,Some(1)), (Ann,Some(1)),
 (Charles,Some(2)), (Went to gym this morning,None), (Bill,Some(1)))

Default None to 0.

myGraph.aggregateMessages[Int](_.sendToSrc(1),
 _ + _).rightOuterJoin(myGraph.vertices).map(
 x => (x._2._2, x._2._1.getOrElse(0))).collect

res9: Array[(String, Int)] = Array((Diane,1), (Ann,1), (Charles,2), (Went to
     gym this morning,0), (Bill,1))

# Serialization / deserialization

myGraph.vertices.saveAsObjectFile("myGraphVertices")
myGraph.edges.saveAsObjectFile("myGraphEdges")

val myGraph2 = Graph(
    sc.objectFile[Tuple2[VertexId,String]]("myGraphVertices"),
    sc.objectFile[Edge[String]]("myGraphEdges")
)

This will save the vertices and edges to different files, each saves a partition. If you want to save it to a single file.

myGraph.vertices.coalesce(1,true).saveAsObjectFile("myGraphVertices")

# Algorithms

# Shortest paths to 3

lib.ShortestPaths.run(myGraph,Array(3)).vertices.collect

# Connected components

val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
    sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
                     Edge(4L,5L,""), Edge(6L,7L,"")))).cache
g.connectedComponents.vertices.map(_.swap).groupByKey.map(_._2).collect

# Predict social circles

# Development environment

# References