SparkSQL: Quickstart Jupyter Template

I was interested in this experiment that involved the querying of 9 million unique records distributed across three HDFS files (total 1.4GB) using Spark RDD’s, Spark DataFrames and SparkSQL to determine performance differences.

The results for the two different types of queries from the experiment are as shown below:

spark_benchmarking.png

Based on my own experience I’ve found that while RDD’s are the most performant, they are tedious when it comes to data manipulation. SparkSQL is definitely the most versatile and maintainable with a slight performance edge over DataFrames when grouping and ordering large datasets.

The below is an attempt to create a reusable template that can easily modified to repurpose SparkSQL queries:

from pyspark.sql import SparkSession

# Create an empty pandas DataFrame 
df = pd.DataFrame()

# Get an existing Spark Session or Create a New One 
sparkSession = SparkSession.builder.appName("reading csv").getOrCreate()

# Read a CSV file and populate a Spark DataFrame
df = sparkSession.read.csv("YOURFILE.csv", header=True, sep=",").cache()

# Print out the schema of the Spark Dataframe
df.dtypes

# Display the schema of the Spark DataFrame
df.schema

# Create a Table from the Spark DataFrame before running SparkSQL
df.createOrReplaceTempView("SOMETABLE")

# Run SparkSQL
sqlDF = sparkSession.sql('''
        SELECT Company, Rating, count(*) as Number_of_Ratings
        FROM SOMETABLE
        GROUP BY Company, Rating
        ORDER BY Company
        ''')

# Display the results
sqlDF.show(200, False)
Advertisements

Orange 3 Natural Language Processing Reusable Template

The below Natural Language workflow can be used to generate Topic Models from a monolingual corpus, along with their associated Word Clouds.

Latent Semantic Indexing, Latent Dirichlet Allocation and Hierarchical Dirichlet Process are the three techniques available for Topic Modelling in the Orange 3 toolkit.

From my experience, the most useful and relevant Topics were produced by the Latent Semantic Indexing (LSI) because of its ability to correlate semantically related terms that are latent in a collection of text. LSI employs a mathematical technique called singular value decomposition (SVD) to identify patterns in the relationships between the terms and concepts contained in an unstructured collection of text.

Screen Shot 2019-07-13 at 9.52.06 pm

Parquet File Viewer for Jupyter Notebook

The following code displays the binary contents of a parquet file as a table in a Jupyter notebook:

import pyarrow.parquet as pq
import pandas as pd

table = pq.read_table('SOME_PARQUET_TEST_FILE.parquet')
table_dict = dict(table.to_pydict())

items = table_dict.items()

keys = [item[0] for item in items]
values = [item[1] for item in items]
pivoted_values = zip(*values)

table_dictionary_array = []

for record in pivoted_values:
    table_dictionary_array.append(dict(zip(keys, record)))
    
df = pd.DataFrame.from_dict(table_dictionary_array)
df

AVRO File Viewer for Jupyter Notebook

The following code displays the binary contents of an AVRO file as a table in a Jupyter notebook:


import avro.schema, pandas as pd
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from tabulate import tabulate

from urllib.request import urlopen
from pandas.io.json import json_normalize

reader = DataFileReader(open("SOME_AVRO_FILE.avro", "rb"),
                        DatumReader())  

df = pd.DataFrame(reader)
print(tabulate(df, headers="firstrow", tablefmt='grid'))

 

Reading in Large Datasets in R

R can appear to run slow while processing larger data sets. Some tips on dealing with the issue of preprocessing Big Data in R are listed below:

TIP 1 – Set the colclasses Argument
While reading in large datasets using read.table(), R can seem to lag as it attempts to infer the class of each column in the background by scanning the dataset. This can slow down the process of reading in the dataset, especially with Big Data.

The colclasses argument in read.table() must be set for large datasets. Use the below code to figure out the classes for each of your columns. This only uses the first 100 rows to infer the column class:

initial <- read.table("datatable.txt", nrows=100)
classes <- sapply(initial, class)
tabAll <- read.table("datatable.txt", colClasses=classes)


TIP 2 – Set the nrows Argument
Set the nrows argument in read.table(). If you’re on a Mac or Linux use the command line utility wc to get an estimate of the number of lines/rows in your input file. This doesn’t have to exact, just close enough.

TIP 3 – Set the comment.char Argument
If there are no comment lines, then set the comment.char argument to be blank.

TIP 4 – System Contingency
When processing large datasets ensure that there is no resource contention on the same machine when using the RServer client, or in a distributed environment if you’ve chosen to use RStudio Server or Microsoft R Server.

TIP 5 – Making Decisions about RAM Restrictions
If the size of the dataset is larger than the memory available on your computer then you will not be able to process the data effectively in R. A 64-bit system will obviously be able to perform better than a 32-bit one.

There is a rough formula that can be used for calculating the amount of RAM required for R to hold the dataset in memory. The MAJOR assumption in the examples below is that all columns are of the numeric class and therefore use 8 bytes/numeric. I use this as a “gut feel” when planning the pre-processing of large datasets.

amount_of_RAM_in_GB <- (no_of_rows * no_of_columns * 8)/2^20/1024

Please note that the actual memory required by the computer to run effectively is roughly double the below required by R.

Rough Calculations:

  • 4GB RAM for 2.69M Rows, 200 Columns
  • 16GB RAM for 4.3M Rows, 500 Columns
  • 75GB RAM for 10M Rows, 1000 Columns
  • 15TB RAM for 1B Rows, 2000 Columns

TIP 6 – Remove NA values
Based on your use case you may make the decision to exclude missing values prior to developing algorithms for machine learning. I’ve followed an approach of extracting bad data (or column data with “NA”) into a separate dataset that I may choose to use for analysis at a later stage.

See the below example for extracting bad data and retaining clean data. This should work also a larger data set.

# Create dummy data for data frame
a <- c(1, 2, 3, 4, NA)
b <- c(6, 7, 8, NA, 10)
c <- c(11, 12, NA, 14, 15)
d <- c(16, NA, 18, 19, 20)
e <- c(21, 22, 23, 24, 25)

# Combine vectors to form a larger data frame
df <- data.frame(a, b, c, d, e)

# Append dataframes with row bind
rdf <- rbind(df, df, df) 

# Create new data frame with only the clean data, i,e; no NA
rdf_only_good_data <- na.omit(rdf)

# Create a new data frame with only bad data, i.e.; only NA
rdf_only_bad_data <- rdf[!complete.cases(rdf),] 

Output:

> rdf
    a  b  c  d  e
1   1  6 11 16 21
2   2  7 12 NA 22
3   3  8 NA 18 23
4   4 NA 14 19 24
5  NA 10 15 20 25
6   1  6 11 16 21
7   2  7 12 NA 22
8   3  8 NA 18 23
9   4 NA 14 19 24
10 NA 10 15 20 25
11  1  6 11 16 21
12  2  7 12 NA 22
13  3  8 NA 18 23
14  4 NA 14 19 24
15 NA 10 15 20 25

> rdf_only_good_data
   a b  c  d  e
1  1 6 11 16 21
6  1 6 11 16 21
11 1 6 11 16 21

> rdf_only_bad_data
    a  b  c  d  e
2   2  7 12 NA 22
3   3  8 NA 18 23
4   4 NA 14 19 24
5  NA 10 15 20 25
7   2  7 12 NA 22
8   3  8 NA 18 23
9   4 NA 14 19 24
10 NA 10 15 20 25
12  2  7 12 NA 22
13  3  8 NA 18 23
14  4 NA 14 19 24
15 NA 10 15 20 25