your datalake needs structure

By | August 10, 2021

A typical datalake contains raw data from many sources. The data can be in various forms: unstructured, semi-structured or structured.

UnStrucutured

  • TXT
  • CSV

Semi-Structured

  • XML
  • JSON

Structured

  • MySQL
  • Parquet

This post will be about improving your datalake by using parquet files for better downstream processing.

Why use Parquet files?

Structured files are preferred for data analysis. This is because unstructured and semi-structured data have a heavy parsing burden and are not easily searchable.

Structured files give you efficient searching capabilities. Databases might be the first thing you think of. However they require a lot of compute power and are not scalable for large data tasks.

Apache Parquet is one structured file type that fits the bill for big data. It is an efficient and performant data storage compared to a row based file like CSV.

Plus it can be used in the Hadoop and Spark ecosystem which can be run on AWS EMR (Elastic Map Reduce) or other clustered environments.

EMR also lets you transform and move large amounts of data into AWS S3 and AWS DynamoDB.

Quick lesson on different storage methodologies

Typical Data files store data in rows or columns. This can be inefficient.

A row based data structure stores columnar data one column after another as shown below. It is difficult to return or insert a column of data, because all the rows will have to be indexed to perform that operation.

A column based data structure stores row data one row after another as shown below. This presents the same problem as row based. If you want to insert or return a row all the columns will need to be indexed.

Parquet solves this problem by using a hybrid model. The rows as saved as groups of columns by using horizontal and vertical partitioning. So if you are interested in a small subset of rows only a portion of the data structure will need to be accessed.

Let’s get to it!

The following examples will be about creating a datalake and putting it to use using these following steps:

  • Extract raw data from a csv file.
  • Create datalake file in parquet format, being careful not to change any data.
  • Then use this datalake file for downstream processing.

Note: We will use a small data set for our example but the same concepts apply to large datasets.

Clone our repo to get the dataset and the zeppelin notebook here. https://github.com/JimmySoftLLC/parquet-example

Then navigate to your cloned folder and run the following docker command to start the zeppelin notebook. Once the container starts navigate to http://localhost:8080/

docker run -p 8080:8080 --rm --name zeppelin -v $(pwd):/opt/zeppelin/notebook apache/zeppelin:0.9.0

The following example uses a zeppelin notebook. You can learn how to more about Zeppelin notebooks in my previous post here. https://www.mysoftwarejourney.com/2021/07/28/zepellin-notebooks/

Create a schema

First a schema needs to be defined for our datalake file. This can be done using several methods:

  • inferring it from the csv file
  • apply a schema to the csv file when opening
  • apply a schema in code after reading the csv file.

We can infer the data using the code below. Be aware that inferring can be inefficient because the process requires two passes through the file. One pass to make sure the types are consistent and a second time to read in the data using the schema.

%spark
spark.read.option("header",true).option("inferSchema",true).csv("/opt/zeppelin/notebook/pizzasize.csv").printSchema()
val dfToOutput = spark.read.option("header",true).option("inferSchema",true).csv("/opt/zeppelin/notebook/pizzasize.csv")

We can instead apply a predefined schema when reading. Not only is this more efficient but it is also gives us more control. We can specify the exact name and type for every column. This important for dates since they get inferred as strings if you don’t specify.

%spark
import org.apache.spark.sql.types.{DoubleType,StringType,StructType,StructField}
val schema = StructType(Array(
    StructField("ID",StringType,true),
    StructField("Store",StringType,true),
    StructField("CrustDescription",StringType,true),
    StructField("Topping", StringType, true),
    StructField("Diameter", DoubleType, true)
  ))
spark.read.option("header", "true").schema(schema).csv("/opt/zeppelin/notebook/pizzasize.csv").printSchema()
val dfToOutput = spark.read.option("header", "true").schema(schema).csv("/opt/zeppelin/notebook/pizzasize.csv")

And finally we can apply the schema using code after reading the csv file. This is not recommend since the dataframe is immutable and it takes a lot of work to create another dataframe to replace it. It’s best to apply the schema when reading.

Output the file

Once the file is read with an appropriate schema the file can written in the parquet format as follows.

%spark
 dfToOutput.write.mode("overwrite").parquet("/opt/zeppelin/notebook/pizzasizes")

This will output a folder with one or many files in your working directory. This seems strange at first but it is how parquet partitions the data.

The datalake is created, let’s do some analysis

Net import the parquet file just created. You will notice that the schema persisted. How cool is that!

%spark
spark.read.parquet("/opt/zeppelin/notebook/pizzasizes").printSchema()
val df = spark.read.parquet("/opt/zeppelin/notebook/pizzasizes")
val dfView = df.select("*")
dfView.createOrReplaceTempView("df")
println("Created View for df")

Let’s do some simple analysis on the data by filtering on “DeepPan” sizes. After running in your notebook click on the bar graph to see the results.

%spark.sql
select Store,CrustDescription, Diameter
from df
where CrustDescription="DeepPan"

Opps! Not quite what we expected. You will need to specify the inputs to the bar chart. Do this by clicking the settings drop down and set them to the following as shown.

Run the paragraph again and you will see that bar chart results. Interesting, it looks like Eagle Boys pizza has the largest diameter :).

Next compare thin crust pizza. The sql is slightly more complicated because the two brands define their crust differently. So we need to change the where clause to include both “ThinNCrispy” and “ThinCrust”.

%spark.sql
select Store,CrustDescription, Diameter
from df
where CrustDescription="ThinNCrispy" or CrustDescription="ThinCrust" 

And finally check out the standard crust using “ClassicCrust” and “MidCrust”.

%spark.sql
select Store,CrustDescription, Diameter
from df
where CrustDescription="ClassicCrust" or CrustDescription="MidCrust" 

Conclusion

This simple example demonstrates how easy it is to make a structured datalake using parquet files. For more information on parquet files check out the video below.