In a previous post we used a jupyter notebook to play with spark. In this post we will use a zepellin notebook. Both can be used for big data and it is personal preference which one you want to use.
Create a directory and create a csv file
First create a directory on your machine using mkdir ~/Developer/zeppelin-example and navigate to it using cd ~/Developer/zeppelin-example
.
Next create some data using google sheets. Navigate to https://www.google.com/sheets/about/ and choose Personal. Select a blank worksheet and enter some data like below and name the file test1
.csv.
Download the file as a csv to the directory you just created. We will use this file later in this tutorial.
Install docker on your mac
You need docker desktop for this tutorial. You can install it using the following instructions. https://docs.docker.com/docker-for-mac/install/
Is it running?
Type in docker version
to see if docker is running properly. You should see both the client and a server listed with no errors.
Then run the following container to start a Zepellin notebook. It will point the the current folder we just set up.
Then run the following container to run Zeppelin. It will point the the current folder we just set up. The notebooks in the folder will persist on your local computer as long as you continue to launch the container with docker run -p 8080:8080 --rm --name zeppelin -v $(pwd):/opt/zeppelin/notebook apache/zeppelin:0.9.0
How cool is that!
docker run -p 8080:8080 --rm --name zeppelin -v $(pwd):/opt/zeppelin/notebook apache/zeppelin:0.9.0
Then navigate to http://localhost:8080 and the following will be displayed.
Press “Create new note” and the following will be displayed. Name your notebook as you would like and press “Create”
Notes consist of paragraphs a little different than jupyter cells. You enter your code in the paragraph and then you press run to run the code. Enter the following code in the paragraph and press run.
%pyspark mytext = 'Hello World' print(mytext)
If you get something like below you are all set!
Lets try some SQL
Now lets play around with the csv file we created earlier. In the next cell in the notebook type in the following and the contents of the csv file will be outputted as shown
%pyspark import pandas as pd pd.read_csv('/opt/zeppelin/notebook/test1.csv')
Next we will use pyspark.sql and create a data frame with a header specified. Entering df
you will show the data frame specification and df.show()
will show the data in the table.
Dataframes are similar to a table in relational database but it has the power of traditional RDDs.
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate() df=spark.read.option('header','true').csv('/opt/zeppelin/notebook/test1.csv') df.show()
RDDs and huge datasets
Big data processing power comes when you use a RDD, a Resilient Distributed Dataset. RDDs are immutable, and fault tolerant. But best part is they run on clusters so your data can be huge!
There are two types of operations that can be perfomed on a RDD.
- Action, applied to a RDD and returns a value, examples count, and reduce.
- Transformation, returns a new RDD, examples filter, and map.
Let’s create one as follows
and perform the count()
action on it. The results will be 8.
%pyspark from pyspark import SparkContext sc = SparkContext.getOrCreate() words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print(counts)
Next let’s perform a collect()
transformation on the words. Collect returns all the elements of the RDD. This is commonly done after a filter, or map transformation so you can print out the result.
%pyspark sc = SparkContext.getOrCreate() words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print(coll)
Next let’s perform a filter()
transformation as follows. Filter takes a lambda function. If you want to learn more about lamba functions in python check out https://www.w3schools.com/python/python_lambda.asp
%pyspark sc = SparkContext.getOrCreate() words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print(filtered)
Next let’s perform a map() transformation which applies a function to every element.
%pyspark sc = SparkContext.getOrCreate() words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print(mapping)
and finally the reduce
action will apply a function to every element and reduces it to a single value. So in the case of add
operator it adds all the values together.
%pyspark sc = SparkContext.getOrCreate() words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) from operator import add nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print(adding)
A big data example with 1 trillion numbers :).
%pyspark big_list = range(1000000000) rdd = sc.parallelize(big_list, 2) odds = rdd.filter(lambda x: x % 2 != 0) evens = rdd.filter(lambda x: x % 2 == 0) my_odds = odds.take(5) my_evens = evens.take(5) print("Odds") for p in my_odds: print(p) print("Even") for p in my_evens: print(p)
The power continues with dataframes and datasets
RDD was the primary user-facing API in Spark since its inception but there are now dataframes and datasets as well. A discussion of these will continue in future blogs.
Conclusion
These are very simple examples but imagine if you had terrabytes of data. These operations would be impossible to do in a typical database. Have fun with Spark.