apache spark

Game Theory With Apache Spark, Part 1


In general terms, Game Theory is concerned with decision making in an environment that involves multiple entities with possibly conflicting interests (Myerson, Roger B. (1997). Game Theory: Analysis of Conflict, Harvard University Press, Wikipedia). Game theory is particularly useful in Prescriptive Analytics where techniques from game theory can be employed to find optimal solutions to complicated decision making problems (for example, see this discussion).

In this article, we will formulate a general resource allocation problem that can be solved using a particular game theory algorithm based on economist Ausubel’s Efficient Dynamic Auction Method. We will then implement the algorithm in Apache Spark Machine Learning Library using Java and demonstrate how it works via examples.

Original Link

Data Science and Engineering Platform in HDP 3: Hybrid, Secure, Scalable

What Is a Data Science and Engineering Platform

Apache Spark is one of our most popular workloads both on-premises and cloud. As we recently announced HDP 3.0.0 (followed by a hardened HDP 3.0.1), we want to introduce the Data Science and Engineering Platform powered by Apache Spark.

As noted in the marketecture above, our Data Science and Engineering Platform is powered by Apache Spark with Apache Zeppelin notebooks to enable Batch, Machine Learning, and Streaming use cases, by personas such as Data Engineers, Data Scientists and Business Analysts. We recently introduced Apache TensorFlow 1.8 as a tech preview feature in HDP 3.0.x to enable the deep learning use cases – while this is intended for proof of concept deployments, we also support BYO dockerized TensorFlow in production environments. Some of the reasons our customers choose our Data Science and Engineering Platform on HDP 3.0 are:

Original Link

What Is a DBA Anyway?

Some time ago, we started a new series here called Database Fundamentals. The very first post in that series asked what a database is.

One of the major announcements at Ignite last month was that of SQL Server 2019 preview and major improvements to Azure SQL Database, Microsoft’s on-premises and cloud-based relational database systems, respectively.

Original Link

Spark SQL: An Introductory Guide

1. Objective

Apache SparkSQL is a Spark module to simplify working with structured data using DataFrame and DataSet abstractions in Python, Java, and Scala. These abstractions are the distributed collection of data organized into named columns. It provides a good optimization technique. Using Spark SQL we can query data, both from inside a Spark program and from external tools that connect through standard database connectors (JDBC/ODBC) to Spark SQL.

This tutorial covers the components of Spark SQL architecture like DataSets and DataFrames, and Apache the Spark SQL Catalyst optimizer. We will also learn the use for Spark SQL in Apache Spark, Spark SQL advantage, and disadvantages.

Original Link

Kalman Filters With Apache Spark Structured Streaming and Kafka


In simple terms, a Kalman filter is a theoretical model to predict the state of a dynamic system under measurement noise. Originally developed in the 1960s, the Kalman filter has found applications in many different fields of technology including vehicle guidance and control, signal processing, transportation, analysis of economic data, and human health state monitoring, to name a few (see the Kalman filter Wikipedia page for a detailed discussion). A particular application area for the Kalman filter is signal estimation as part of time series analysis. Apache Spark provides a great framework to facilitate time series stream processing. As such, it would be useful to discuss how the Kalman filter can be combined with Apache Spark.

In this article, we will implement a Kalman filter for a simple dynamic model using the Apache Spark Structured Streaming engine and an Apache Kafka data source. We will use Apache Spark version 2.3.1 (latest, as of writing this article), Java version 1.8, and Kafka version 2.0.0. The article is organized as follows: the next section gives an overview of the dynamic model and the corresponding Kalman filter; the following section will discuss the application architecture and the corresponding deployment model, and in that section we will also review the Java code comprising different modules of the application; then, we will show graphically how the Kalman filter performs by comparing the predicted variables to measured variables under random measurement noise; we’ll wrap up the article by giving concluding remarks.

Original Link

Talend and Apache Spark: Debugging and Logging Best Practices

So far, our journey on using Apache Spark with Talend has been a fun and exciting one. The first three posts on my series provided an overview of how Talend works with Apache Spark, some similarities between Talend and Spark Submit, the configuration options available for Spark jobs in Talend and how to tune Spark jobs for performance. If you haven’t already read them you should do so before getting started here. Start with: "Talend & Apache Spark: A Technical Primer"; "Talend vs. Spark Submit Configuration: What’s the Difference?"; "Apache Spark and Talend: Performance and Tuning."

To finish this series, we’re going to talking about logging and debugging. When starting your journey with using Talend and Apache Spark you may have run into the error like below printed out in your console log:

Original Link

Talend and Apache Spark: A Technical Primer and Overview

In my years at Talend as a Support Engineer, before I moved into the Customer Success Architect team, customers often asked about Talend’s capabilities with Apache Spark. When we talk about Spark the first thing that always comes to mind is the command Spark submit that we use to submit our Spark jobs. So, the question, how a Talend Spark job equates to a regular Spark submit, naturally comes up. In this blog, we are going to cover the different Apache Spark modes offered, the ones used by Talend, and how Talend works with Apache Spark.

An Intro to Apache Spark Jobs

Apache Spark has two different types of jobs that you can submit. One of them is Spark Batch and the other is Spark Streaming. Spark Batch operates under a batch processing model, where a data set is collected over a period of time, then gets sent to a Spark engine for processing.

Original Link

Data Quality and Validation

Big data and machine learning deal with data. So, its important to keep the data correct in the system. If data is not accurate, it not only reduces the efficiency of the system, but also leads to some unfavourable insights. One of the big steps toward ensuring the correctness of data is through data quality and validation. With an increasing volume of data, and the noise that goes along with that, new methods or checks are getting added every day to ensure this data’s quality. Since the amount of data is huge, one more thing which needs to be considered here is how to ensure fast processing of all of these checks and validations; i.e., a system which can go through each and every record ingested in a highly distributed way. This post talks about some examples of data quality and validation checks and how easy it is to programmatically ensure data quality with the help of Apache Spark and Scala.

Data accuracy, which refers to the closeness of results of observations to the true values or values accepted as being true.

  • Null Value: Record that contains null value. For example: male/female/null
  • Specific Value: company ID

Schema Validation: Every batch of data should follow the same column name and data type.

 for (elem <- sampledataframe.schema) { if (elem.dataType != "ExpectedDataType") { // Print Error } }

Column Value Duplicates (like duplicate email in records)

 val dataframe1 = sampledataframe.groupBy("columnname").count() val dataframe2 = dataframe1.filter("count = 1")
println("No of duplicate records : " + (dataframe1.count() - dataframe2.count()).toString())

Uniqueness Check: Records are unique and kept in a w.r.t column

This is similar to duplicate.

val dataframe1 = sampledataframe.groupBy("columnname").count()
dataframe1.filter("count = 1").count() // this will give unique count.

Accuracy Check: Regular Expressions can be used. For example, we can look for email IDs that contain@.

or sampledataframe.where(sampledataframe.col("columnname").rlike("f*l*e")).count()

Data currency: How up-to-date is your data? Here the assumption is that data is coming in on a daily basis and is then checked and timestamped.

This list can go on and on, but the good thing about this approach based on Spark and Scala is that, with fewer code, a lot can be achieved using a huge amout of data.

Sometimes, a system may have some specific requirements related to who is consuming the data and in what form; and the consumber may have assumptions about the the data. 

Data usability: Consumer applications may apply certain expectations like:

  • column1.value should not be equal to column2.value
  • column3.value should always be column1.value + column2.value
  • No value in column x should appear more than x% of the time
 var arr = Array("ColumnName1", "ColumnName2", "ColumnName3") var freq = sampledataframe.stat.freqItems(arr, 0.4) freq.collect()

While these are considered basic validations, we also have some advanced level checks to ensure data quality, like:

  • Anomaly Detection: This includes two major points:
    • If the dimension is given, like a time-based anomaly. This means within any timeframe (slice period), the number of records should not be more than x% of the average. To achive this with Spark:
      • Let’s assume the slice period is 1 minute.
      • First, the timestamp column needs to be filtered/formatted such that the unit representation of the timestamp is a minute. This will produce duplicates, but that should not be an issue.
      • Next, use groupBy, like so: sampledataframe.groupBy("timestamp").count().
      • Get the average of that count and also find the slice period (if it exists), which has x% more records than the average. 
  • Ordering
    • The record should follow a certain order. For example, within a day the records for a particular consumer should start with impressions, clicks, landing page, cart, and end with purchases. There may be partial records, but it should follow a general order. To check this with Spark:
      •  groupBy("ID").
      • Run the order check for the group.
  • Circular dependency: Let me explain this with an example.
    • If two columns are taken up where column A => Column B, and the records are like:
    ID Name Fathers Name
    1 Alpha Bravo
    2 Bravo Gamma
    3 Gamma


    • If consuming the application tries to print the family hierarchy, it may fall into a loop.
  • Failure Trend
    • Consider that data is coming into the system everyday. Let’s assume its behavioral/touchpoint data. For simplicity, let’s call each day’s data a ‘batch.’ In every batch, if we are getting exactly the same set of failures, then there must be a failure trend which is going on across batches.
    • If the failure is coming for same a set of email_id (emain id is one column), then it might be a symptom of a bot’s behavior.
  • Data Bias: This means a consistent shift in the graph. Like:
    • If 30 minutes is getting added to the timestamp, then all the records will always have this 30 minute implicit bias. So, if the prediction algorithm is going to use this data, this bias will impact its results.
    • If the algorithms which is producing this data, has learning bias then for one set of data it produces more default values then for other. Like based on buying behaviour, it can predict the wrong gender. 

Bot Behavioor: Usually, a bot’s behaviour is something like:

  • It generates records with the same set of unique identifiers of records. Like same set of email_ids.
  • It generates website traffic at any particular time. This is a time-based anomaly.
  • It generates records in a defined order: ordering checks across data batches.

Original Link

PySpark Tutorial: Learn Apache Spark Using Python

In a world where data is being generated at such an alarming rate, the correct analysis of that data at the correct time is very useful. One of the most amazing frameworks to handle big data in real-time and perform analyses is Apache Spark, And if we talk about the programming languages being used nowadays for handling complex data analysis and data munging tasks, I’m sure Python will top this chart. So in this PySpark tutorial, I’ll discuss the following topics:

  • What is PySpark?
  • PySpark in the Industry
  • Why Go for Python?
  • Spark RDDs
  • Machine Learning with PySpark

PySpark Tutorial: What Is PySpark?

Apache Spark is a fast cluster computing framework which is used for processing, querying and analyzing big data. Being based on in-memory computation, it has an advantage over several other big data frameworks.

Originally written in the Scala programming language, the open source community has developed an amazing tool to support Python for Apache Spark. PySpark helps data scientists interface with RDDs in Apache Spark and Python through its library Py4j. There are many features that make PySpark a better framework than others:

  • Speed: It is 100x faster than traditional large-scale data processing frameworks.
  • Powerful Caching: Simple programming layer provides powerful caching and disk persistence capabilities.
  • Deployment: Can be deployed through Mesos, Hadoop via Yarn, or Spark’s own cluster manager.
  • Real Time: Real-time computation and low latency because of in-memory computation.
  • Polyglot: Supports programming in Scala, Java, Python, and R.

Let’s move ahead with our PySpark Tutorial Blog and see where is Spark used in the industry.

PySpark in the Industry

Let’s move ahead with our PySpark tutorial and see where Spark is used in the industry.

Every industry revolves around big data and where there’s big data, there’s analysis involved. So let’s have a look at the various industries where Apache Spark is used.

Media is one of the biggest industries growing towards online streaming. Netflix uses Apache Spark for real-time stream processing to provide personalized online recommendations to its customers. It processes 450 billion events per day which flow to server-side applications.

Finance is another sector where Apache Spark’s real-time processing plays an important role. Banks are using Spark to access and analyze social media profiles to gain insights which can help them make the right business decisions for credit risk assessment, targeted ads, and customer segmentation. Customer churn is also reduced using Spark. Fraud detection is one of the most widely used areas of machine learning where Spark is involved.

Healthcare providers are using Apache Spark to analyze patient records along with past clinical data to identify which patients are likely to face health issues after being discharged from the clinic. Apache Spark is used in genomic sequencing to reduce the time needed to process genome data.

Retail and e-commerce is an industry where one can’t imagine it running without the use of analysis and targeted advertising. One of the largest e-commerce platform today, Alibabaruns some of the largest Spark jobs in the world in order to analyze petabytes of data. Alibaba performs feature extraction in image data. eBay uses Apache Spark to provide targeted offers, enhance customer experience and optimize overall performance.

Travel industries also use Apache Spark. TripAdvisor, a leading travel website that helps users plan a perfect trip, is using Apache Spark to speed up its personalized customerrecommendations. TripAdvisor uses Apache Spark to provide advice to millions of travelers by comparing hundreds of websites to find the best hotel prices for its customers.

An important aspect of this PySpark tutorial is to understand why we need to use Python. Why not Java, Scala or R?

Easy to Learn: For programmers, Python is comparatively easier to learn because of its syntax and standard libraries. Moreover, it’s a dynamically typed language, which means RDDs can hold objects of multiple types.

A vast set of libraries: Scala does not have sufficient data science tools and libraries like Python for machine learning and natural language processing. Moreover, Scala lacks good visualization and local data transformations.

Huge Community Support: Python has a global community with millions of developers that interact online and offline in thousands of virtual and physical locations.

One of the most important topics in this PySpark tutorial is the use of RDDs. Let’s understand what RDDs are. 

Spark RDDs

When it comes to iterative distributed computing, i.e. processing data over multiple jobs in computations, we need to reuse or share data among multiple jobs. Earlier frameworks like Hadoop had problems while dealing with multiple operations/jobs like:

  • Storing data in intermediate storage such as HDFS.
  • Multiple I/O jobs make the computations slow.
  • Replications and serializations which in turn makes the process even slower.

RDDs try to solve all the problems by enabling fault-tolerant distributed in-memory computations. RDD is short for Resilient Distributed Datasets. RDD is a distributed memory abstraction which lets programmers perform in-memory computations on large clusters in a fault-tolerant manner. They are the read-only collection of objects partitioned across a set of machines that can be rebuilt if a partition is lost. There are several operations performed on RDDs:

  • Transformations: Transformations create a new dataset from an existing one. Lazy Evaluation.
  • Actions: Spark forces the calculations for execution only when actions are invoked on the RDDs.

Let’s understand a few Transformations, Actions, and Functions.

Reading a File and Displaying Top n Elements:

rdd = sc.textFile("file:///home/edureka/Desktop/Sample")
[u'Deforestation is arising as the main environmental and social issue which has now taken the form of more than a powerful demon. ', u'We must know about the causes, effects and ways to solve the problems arisen because of the deforestation. ', u'We have provided many paragraphs, long and short essay on deforestation in order to help your kids and children to get aware about the problem as well as get participated in the essay writing competition in the school or outside the school. ', u'You can select any deforestation essay given below according to the class standard. ', u'Deforestation is arising as the major global problem to the society and environment.']

Converting to Lowercase and Splitting:(Lower and Split)

def Func(lines):
lines = lines.lower()
lines = lines.split()
return lines
rdd1 = rdd1.take(5)
[[u'deforestation', u'is', u'arising', u'as', u'the', u'main', u'environmental', u'and', u'social', u'issue', u'which', u'has', u'now', u'taken',

Removing StopWords:(Filter)

stop_words = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I'd','why','with']
rdd2 = rdd1.filter(lambda z: z not in stop_words)
[u'deforestation', u'arising', u'main', u'environmental', u'social', u'issue', u'which', u'has', u'now', u'taken']

Sum of Numbers from 1 to 500: (Reduce)

sum_rdd = sc.parallelize(range(1,500))
sum_rdd.reduce(lambda x,y: x+y)

Machine Learning With PySpark

Continuing our PySpark tutorial, let’s analyze some basketball data and make some predictions. So, here we are going to use data of all the players in NBA since 1980 [year of introduction of 3 Pointers].

df ='header','true')\
['_c0', 'player', 'pos', 'age', 'team_id', 'g', 'gs', 'mp', 'fg', 'fga', 'fg_pct', 'fg3', 'fg3a', 'fg3_pct', 'fg2', 'fg2a', 'fg2_pct', 'efg_pct', 'ft', 'fta', 'ft_pct', 'orb', 'drb', 'trb', 'ast', 'stl', 'blk', 'tov', 'pf', 'pts', 'yr']

Sorting Players (OrderBy) and toPandas:

Here we are sorting players based on points scored in a season.

df.orderBy('pts',ascending = False).limit(10).toPandas()[['yr','player','age','pts','fg3']]

Using DSL and matplotlib:

Here we are analyzing the average number of 3 point attempts for each season in a time limit of 36 min [an interval corresponding to an approximate full NBA game with adequate rest]. We compute this metric using the number of 3-point field goal attempts (fg3a) and minutes played (mp) and then plot the result using matlplotlib.

from pyspark.sql.functions import col
fga_py = df.groupBy('yr')\
.agg({'mp' : 'sum', 'fg3a' : 'sum'})
.select(col('yr'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_p36m'))\
.orderBy('yr') from matplotlib import pyplot as plt
import seaborn as sns'fivethirtyeight') _df = fga_py.toPandas()
plt.plot(_df.yr,_df.fg3a_p36m, color = '#CD5C5C')
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.annotate('3 pointer introduced', xy=(1980, .5), xytext=(1981, 1.1), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved in 3-point line', xy=(1996, 2.4), xytext=(1991.5, 2.7), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved back\n3-point line', xy=(1998, 2.), xytext=(1998.5, 2.4), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))

Linear Regression and VectorAssembler:

We can fit a linear regression model to this curve to model the number of shot attempts for the next 5 years. We have to transform our data using the VectorAssembler function to a single column. This is a requirement for the linear regression API in MLlib.

from import VectorAssembler
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\

We then build our linear regression model object using our transformed data.

from import LinearRegression
lr = LinearRegression(maxIter=10)
model =

Applying Trained Model to Dataset:

We apply our trained model object model to our original training set along with 5 years of future data:

from pyspark.sql.types import Row # apply model for the 1979-80 season thru 2020-21 season
training_yrs ='yr') x: x[0]).collect()
training_y ='fg3a_p36m') x: x[0]).collect()
prediction_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + prediction_yrs # built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')&amp;amp;lt
all_years_features = t.transform( # apply linear regression model
df_results = model.transform(all_years_features).toPandas()

Plotting the Final Prediction:

We can then plot our results and save the graph in a specified location.

plt.plot(df_results.yr,df_results.prediction, linewidth = 2, linestyle = '--',color = '#224df7', label = 'L2 Fit')
plt.plot(training_yrs, training_y, color = '#f08080', label = None)
plt.ylabel('Number of attempts')
plt.legend(loc = 4)
_=plt.title('Player average 3-point attempts (per 36 minutes)')

And, with this graph, we come to the end of this PySpark tutorial.

So this is it, guys!

I hope you guys got an idea of what PySpark is, why Python is best suited for Spark, the RDDs and a glimpse of machine learning with Pyspark. Congratulations, you are no longer a newbie to PySpark. 

Original Link

Introduction to Spark With Python: PySpark for Beginners

Apache Spark is one the most widely used frameworks when it comes to handling and working with Big Data and Python is one of the most widely used programming languages for Data Analysis, Machine Learning, and much more. So, why not use them together? This is where Spark with Python also known as PySpark comes into the picture.

With an average salary of $110,000 per annum for an Apache Spark Developer, there’s no doubt that Spark is used in the industry a lot. Because of its rich library set, Python is used by the majority of Data Scientists and Analytics experts today. Integrating Python with Spark was a major gift to the community. Spark was developed in the Scala language, which is very much similar to Java. It compiles the program code into bytecode for the JVM for Spark big data processing. To support Spark with Python, the Apache Spark community released PySpark. In this Spark with Python blog, I’ll discuss the following topics.

  • Introduction to Apache Spark and its features
  • Why go for Python?
  • Setting up Spark with Python (PySpark)
  • Spark in Industry
  • PySpark SparkContext and Data Flow
  • PySpark KDD Use Case

Apache Spark is an open-source cluster-computing framework for real-time processing developed by the Apache Software Foundation. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.

Below are some of the features of Apache Spark which gives it an edge over other frameworks:

  • Speed: It is 100x faster than traditional large-scale data processing frameworks.
  • Powerful Caching: Simple programming layer provides powerful caching and disk persistence capabilities.
  • Deployment: Can be deployed through Mesos, Hadoop via Yarn, or Spark’s own cluster manager.
  • Real Time: Real-time computation and low latency because of in-memory computation.
  • Polyglot: It is one of the most important features of this framework as it can be programmed in Scala, Java, Python, and R.

Although Spark was designed in Scala, which makes it almost 10 times faster than Python, Scala is faster only when the number of cores being used is less. As most of the analyses and processes nowadays require a large number of cores, the performance advantage of Scala is not that much.

For programmers, Python is comparatively easier to learn because of its syntax and standard libraries. Moreover, it’s a dynamically typed language, which means RDDs can hold objects of multiple types.

Although Scala has SparkMLlib it doesn’t have enough libraries and tools for Machine Learning and NLP purposes. Moreover, Scala lacks Data Visualization.

Setting Up Spark With Python (PySpark)

I hope you guys know how to download Spark and install it. So, once you’ve unzipped the spark file, installed it and added it’s path to the .bashrc file, you need to type in source .bashrc

export SPARK_HOME = /usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7/bin

To open PySpark shell, you need to type in the command ./bin/pyspark

Apache Spark, because of it’s amazing features like in-memory processing, polyglot, and fast processing is being used by many companies all around the globe for various purposes in various industries:

Yahoo! uses Apache Spark for its Machine Learning capabilities to personalize its news and web pages and also for target advertising. They use Spark with Python to find out what kind of news users are interested in reading and categorizing the news stories to find out what kind of users would be interested in reading each category of news.

TripAdvisor uses Apache Spark to provide advice to millions of travelers by comparing hundreds of websites to find the best hotel prices for its customers. The time taken to read and process the reviews of the hotels in a readable format is done with the help of Apache Spark.

One of the world’s largest e-commerce platforms, Alibaba, runs some of the largest Apache Spark jobs in the world in order to analyze hundreds of petabytes of data on its e-commerce platform.

PySpark SparkContext and Data Flow

Talking about Spark with Python, working with RDDs is made possible by the library Py4j. PySpark Shell links the Python API to Spark Core and initializes the Spark Context. Spark Context is at the heart of any Spark application.

  1. Spark Context sets up internal services and establishes a connection to a Spark execution environment.
  2. The Spark Context object in driver program coordinates all the distributed processes and allows for resource allocation.
  3. Cluster Managers provide Executors, which are JVM processes with logic.
  4. Spark Context objects send the application to executors.
  5. Spark Context executes tasks in each executor.

Image title

PySpark KDD Use Case

Now let’s have a look at a use case: KDD’99 Cup (International Knowledge Discovery and Data Mining Tools Competition). Here we will take a fraction of the dataset because the original dataset is too big.

import urllib
f = urllib.urlretrieve ("", "kddcup.data_10_percent.gz")

Creating RDD:

Now we can use this file to create our RDD.

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)


Suppose we want to count how many normal interactions we have in our dataset. We can filter our raw_data RDD as follows.

from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))


Now we can count how many elements we have in the new RDD.

from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))


There are 97278 'normal' interactions
Count completed in 5.951 seconds


In this case, we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows. Here we will use the map() and take() transformation.

from pprint import pprint
csv_data = x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))


Parse completed in 1.715 seconds
[u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0',
. u'normal.']


Now we want to have each element in the RDD as a key-value pair where the key is the tag (e.g. normal) and the value is the whole list of elements that represents the row in the CSV formatted file. We could proceed as follows. Here we use line.split()and map().

def parse_interaction(line):
elems = line.split(",")
tag = elems[41]
return (tag, elems) key_csv_data =
head_rows = key_csv_data.take(5)


(u'normal.', [u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0.00', u'1.00',
. u'normal.'])

The Collect Action:

Here we are going to use the collect() action. It will get all the elements of RDD into memory. For this reason, it has to be used with care when working with large RDDs.

t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))


Data collected in 17.927 seconds

That took longer than any other action we used before, of course. Every Spark worker node that has a fragment of the RDD has to be coordinated in order to retrieve its part and then reduce everything together.

As a final example that will combine all the previous ones, we want to collect all the normal interactions as key-value pairs.

# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file) # parse into key-value pairs
key_csv_data = # filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.") # collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)


Data collected in 12.485 seconds
There are 97278 normal interactions

So this is it, guys!

I hope you enjoyed this Spark with Python article. If you are reading this, congratulations! You are no longer a newbie to PySpark. Try out this simple example on your systems now.

Original Link

Calculating TF-IDF With Apache Spark

Term Frequency-Inverse Document Frequency (TF-IDF) is a widely known technique in text processing. This technique allows one to assign each term in a document a weight. Terms with high frequency within a document have high weights. In addition, terms frequently appearing in all documents of the document corpus have lower weights.

Image title

TF-IDF is used in a large variety of applications. Typical use cases include:

  • Document search.
  • Document tagging.
  • Text preprocessing and feature vector engineering for Machine Learning algorithms.

There is a vast number of resources on the web explaining the concept itself and the calculation algorithm. This article does not repeat the information in these other Internet resources, it just illustrates TF-IDF calculation with help of Apache Spark. Emml Asimadi, in his excellent article Understanding TF-IDF, shares an approach based on the old Spark RDD and the Python language. This article, on the other hand, uses the modern Spark SQL API and Scala language.

Although Spark MLlib has an API to calculate TF-IDF, this API is not convenient to learn the concept. MLlib tools are intended to generate feature vectors for ML algorithms. There is no way to figure out the weight for a particular term in a particular document. Well, let’s make it from scratch, this will sharpen our skills.

First, we will implement the TF-IDF calculation algorithm. Then we will plug our implementation into a simple document search engine.

Calculating TF-IDF

1. Term Frequency – TF

TF is the number of times a term occurs within a document. This means that a term has different TF values for different documents of the corpus.

But what is a document? In our case, we suppose that each document in our document corpus is already preprocessed to a bag of words. For the sake of brevity, we omit preprocessing steps like tokenization, stop words removal, punctuation removal, other types of cleanup. Let’s assume that we have a data set of documents (Spark DataFrame) like below:

|document |
|[one, flesh, one, bone, one, true, religion]|
|[all, flesh, is, grass] |
|[one, is, all, all, is, one] |

First, we need unique identifiers for all documents in our document corpus. Let’s make it:

documents.withColumn("doc_id", monotonically_increasing_id())
|document |doc_id|
|[one, flesh, one, bone, one, true, religion]|0 |
|[all, flesh, is, grass] |1 |
|[one, is, all, all, is, one] |2 |

Now, remember: we want to count terms in documents. We need to make a step from a set of documents to a set of tokens belonging to documents. In other words, “unfold” each document:

val columns = :+ (explode(col("document")) as "token")
val unfoldedDocs = _*)
|document |doc_id|token |
|[one, flesh, one, bone, one, true, religion]|0 |one |
|[one, flesh, one, bone, one, true, religion]|0 |flesh |
|[one, flesh, one, bone, one, true, religion]|0 |one |
|[one, flesh, one, bone, one, true, religion]|0 |bone |
|[one, flesh, one, bone, one, true, religion]|0 |one |
|[one, flesh, one, bone, one, true, religion]|0 |true |
|[one, flesh, one, bone, one, true, religion]|0 |religion|
|[all, flesh, is, grass] |1 |all |
|[all, flesh, is, grass] |1 |flesh |
|[all, flesh, is, grass] |1 |is |
|[all, flesh, is, grass] |1 |grass |
|[one, is, all, all, is, one] |2 |one |
|[one, is, all, all, is, one] |2 |is |
|[one, is, all, all, is, one] |2 |all |
|[one, is, all, all, is, one] |2 |all |
|[one, is, all, all, is, one] |2 |is |
|[one, is, all, all, is, one] |2 |one |

The explode function from the Spark SQL API does the job: it “explodes” an array of tokens so each token comes in a separate row.

Now we are ready to calculate term frequencies – just count them for each document:

unfoldedDocs.groupBy("doc_id", "token") .agg(count("document") as "tf")
|doc_id|token |tf |
|0 |one |3 |
|1 |is |1 |
|1 |all |1 |
|0 |true |1 |
|2 |all |2 |
|1 |grass |1 |
|0 |religion|1 |
|1 |flesh |1 |
|2 |one |2 |
|0 |bone |1 |
|2 |is |2 |
|0 |flesh |1 |

The result is as expected: for example, the term “one” occurs three times in the first document and twice in the third document. Now we are putting this result aside for a while and advancing to the next topic.

2. Document Frequency – DF

DF of a term is the number of documents having this term. We can count documents for each term in the unfoldedDocs data set that we have calculated in the previous step:

unfoldedDocs.groupBy("token") .agg(countDistinct("doc_id") as "df")
|token |df |
|bone |1 |
|religion|1 |
|one |2 |
|grass |1 |
|flesh |2 |
|is |2 |
|all |2 |
|true |1 |

We use the countDistinct function from the Spark SQL API to count distinct documents for each term.

3. Inverse Document Frequency – IDF

Thanks to IDF, we can suppress commonly used words like “is” or “one” from our sample document corpus. Such words, despite having high TF, do not convey information about relevance or irrelevance of a particular document. IDF efficiently down-weights “background noise” terms.

In this article, we use the same formula to calculate IDF that is included in Spark MLlib:

IDF(t,D) = log[ (|D| + 1) / (DF(t,D) + 1) ],


  • IDF(t, D) is the IDF of the term t in the document corpus D.
  • |D| is the total number of documents in the corpus D.
  • DF(t, D) is the DF of the term t in the document corpus D.

Sounds clear? Let’s code it. We use the Swiss army knife of the Spark SQL API – user-defined functions (UDF) – to calculate IDF for all rows in the DF data set from the previous step:

val calcIdfUdf = udf { df: Long => calcIdf(docCount, df) }
tokensWithDf.withColumn("idf", calcIdfUdf(col("df")))
|token |df |idf |
|bone |1 |0.6931471805599453 |
|religion|1 |0.6931471805599453 |
|one |2 |0.28768207245178085|
|grass |1 |0.6931471805599453 |
|flesh |2 |0.28768207245178085|
|is |2 |0.28768207245178085|
|all |2 |0.28768207245178085|
|true |1 |0.6931471805599453 |

In the code snippet above, calcIdf() is a function that literally implements the formula to calculate IDF.

We are almost done. Having TF and IDF at hands, we are ready to make the final step – calculate TF-IDF.


TF-IDF of a term is a product of the term’s TF and IDF. Thus, a term has different TF-IDF values for different documents in the corpus.

So far we have two data sets:

  • tokensWithTf – term frequencies calculated at step 1.
  • tokensWithIdf – inverse document frequencies obtained at step 3.

We have to join them in order to calculate TF-IDF for all terms on a per-document basis:

tokensWithTf .join(tokensWithIdf, Seq("token"), "left") .withColumn("tf_idf", col("tf") * col("idf"))
|token |doc_id|tf |df |idf |tf_idf |
|one |0 |3 |2 |0.28768207245178085|0.8630462173553426 |
|is |1 |1 |2 |0.28768207245178085|0.28768207245178085|
|all |1 |1 |2 |0.28768207245178085|0.28768207245178085|
|true |0 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|all |2 |2 |2 |0.28768207245178085|0.5753641449035617 |
|grass |1 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|religion|0 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|flesh |1 |1 |2 |0.28768207245178085|0.28768207245178085|
|one |2 |2 |2 |0.28768207245178085|0.5753641449035617 |
|bone |0 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|is |2 |2 |2 |0.28768207245178085|0.5753641449035617 |
|flesh |0 |1 |2 |0.28768207245178085|0.28768207245178085|

Optionally, we can join this resulting data frame with the original data frame of documents in order to retain the info about documents. We can also omit intermediate results:

|doc_id|document |token |tf_idf |
|0 |[one, flesh, one, bone, one, true, religion]|one |0.8630462173553426 |
|0 |[one, flesh, one, bone, one, true, religion]|religion|0.6931471805599453 |
|0 |[one, flesh, one, bone, one, true, religion]|bone |0.6931471805599453 |
|0 |[one, flesh, one, bone, one, true, religion]|true |0.6931471805599453 |
|0 |[one, flesh, one, bone, one, true, religion]|flesh |0.28768207245178085|
|1 |[all, flesh, is, grass] |grass |0.6931471805599453 |
|1 |[all, flesh, is, grass] |all |0.28768207245178085|
|1 |[all, flesh, is, grass] |is |0.28768207245178085|
|1 |[all, flesh, is, grass] |flesh |0.28768207245178085|
|2 |[one, is, all, all, is, one] |all |0.5753641449035617 |
|2 |[one, is, all, all, is, one] |is |0.5753641449035617 |
|2 |[one, is, all, all, is, one] |one |0.5753641449035617 |

That’s it! We have calculated TF-IDF weights of each term within all documents of the document corpus.

Image title

Using TF-IDF to Search Documents

Now we have quite a powerful weapon in our hands, so let’s test it in the field. The rest of this article describes a simple (by no means competing with Google) document search engine based on TF-IDF ranking.

The source code is available on GitHub: spark-sql-tfidf. You can find the build and running instructions on the project page.

The search engine accepts a set of user-specified keywords. Then it ranks all documents in the database against the user’s set of keywords. The ranking formula is trivial:

Rank(d, keywords) = TF-IDF(keyword1, d) + ... + TF-IDF(keywordN, d)

In other words, the rank of a document is a sum of TF-IDF weights for all the user’s keywords within this document. The search engine picks the top 5 documents and outputs them in the order of relevance (highest rank first).

The project has a dummy database of song lyrics, so you can try to look for songs matching your mood:

Enter keywords separated by spaces (CTRL-C for exit):
love forever
Found: 1. love_me_tender.txt 2. red_river_valley.txt

Happy text mining!

Original Link

Consensus Clustering Via Apache Spark


In this article, we will discuss a technique called Consensus Clustering to assess the stability of clusters generated by a clustering algorithm with respect to small perturbations in the data set. We will review a sample application built using the Apache Spark machine learning library to show how consensus clustering can be used with K-means, Bisecting K-means, and Gaussian Mixture, three distinct clustering algorithms.

Cluster analysis [1] in machine learning aims to partition data into separate, nonoverlapping sets based on a similarity measure between the data points. The data points in the same cluster must be as close (similar) to each other as possible and the data points in different clusters must be as distant (dissimilar) as possible. Cluster analysis has many applications in various scientific disciplines including biology, bioinformatics, medicine, business, computer science, and social sciences [1]. Below are some examples.

  • Clustering can be used for pixel classification of medical images in order to aid in medical diagnosis [2].

  • Cancer patient data can be clustered into groups based on certain attributes, e.g. ‘regional nodes positive’ and ‘stage group,’ in order to analyze how life expectancy of patients varies depending on which cluster they belong to [3].

  • Gene expressions of cancer cells can be profiled via clustering techniques in order to analyze cell structure and predict survival [4].

Many different techniques and algorithms are available for clustering analysis for which [5] and [6] provide excellent reviews.

Problem Statement

Different clustering algorithms could create different clusters for the same data set. At different executions, even the same algorithm could produce different clusters, e.g. because of a random start. As an example, we downloaded a data set from the National Cancer Institute GDC Data Portal regarding Glioblastoma Multiforme (GBM) and Low-Grade Glioma (LGG) patients and partitioned them into three clusters using the K-means clustering algorithm.

The results are shown in Figures 1.a, 1.b for two different executions.

Figure 1.a. A set of clusters obtained by K-means algorithm for GBM and LGG patients.

Figure 1.a. A set of clusters obtained by K-means algorithm for GBM and LGG patients.

Figure 1.b. Another set of clusters obtained by K-means algorithm for GBM and LGG patients.

Figure 1.b. Another set of clusters obtained by K-means algorithm for GBM and LGG patients.

In each diagram, there are three clusters depicted by black, dark blue, and light blue colors. The horizontal axes represent variant classification (a particular characteristic of the variant allele associated with the mutation of TP53 gene observed in the patient) and tumor grade. The vertical axis represents the number of chemotherapy treatments administered to the patient.

Notice that the cluster sets are not consistent. If an algorithm produces significantly different clusters for the same data set during different executions, can we trust that algorithm? How can we choose one algorithm over the other in order to feel most confident about the structure of clusters?

In this article, we will discuss an analytical technique called Consensus Clustering to represent consensus across multiple runs of a clustering algorithm and analyze the stability of discovered clusters with respect to sampling variability [7].

Organization of Article

In the next section, we will discuss basics of consensus clustering. The following section is concerned with the code review of a sample application developed with the Apache Spark MLLib machine learning Library [8] to implement consensus clustering with three different clustering algorithms, K-means, Bisecting K-means, and Gaussian Mixture [5], [6]. The last section gives conclusions.

Consensus Clustering

The consensus clustering is based on the assumption that a reliable clustering algorithm should create similar clusters each time it is executed on a slightly different version of a data set. In other words, slight perturbations on a data set should not change the resulting clusters significantly.

In order to implement consensus clustering, the original data set is randomly subsampled m times to create m different data sets of slightly smaller size than the original one. Then the clustering algorithm is repeated m times (iterations), once for each subsampled data set.

Next, for each pair of data points in the original dataset, it is determined how many times the pair appears in the same cluster in a particular iteration of the clustering algorithm. The idea is that if the pair of data points are ‘similar’ they should ideally be placed in the same cluster in every iteration when the subsampled data set corresponding to the iteration contains both. Similarly, if the data points are ‘dissimilar’ they should be placed in distinct clusters in every iteration when the subsampled data set corresponding to the iteration contains both.

In order to quantify the consensus, a so-called ‘consensus matrix’ is constructed. Let the original data set consist of N data points. Then, the consensus matrix is a symmetric, N x N matrix where the element (i,j) is equal to the number of times data points ai and aj assigned into the same cluster divided by the number of times data points ai and aj were selected together. The diagonal elements of the consensus matrix are always 1 and any non-diagonal element is between 0 and 1, inclusive.

As a very simple example, consider a dataset consisting of 5 data points {a1, a2, a3, a4, a5} in 2-dimensional space.

Figure 2. An example of 5 data points in 2-dimensional space.

Figure 2. An example of 5 data points in 2-dimensional space.

A consensus matrix for those data points could look like this:

a1 a2 a3 a4 a5
a1 1 0.8 0.9 0.1 0
a2 0.8 1 0.7 0.05 0.01
a3 0.9 0.7 1 0.15 0.1
a4 0.1 0.05 0.15 1 0.9
a5 0 0.01 0.1 0.9 1

Consider the highlighted element (1,3) which corresponds to the pair of data points a1, a3. The value of that element implies that:

(# of times a1 and a3 are assigned to the same cluster) /
(# of times a1 and a3 selected together) = 0.9

Hence, 90% of the time when the points a1 and a3 were selected together in a data sample, the algorithm placed them in the same cluster. On the other hand, the consensus matrix implies that only 10% of the time the algorithm placed points a1 and a4 in the same cluster when they were selected together (element (1,4)).

A satisfactory consensus would imply that any non-diagonal element of the consensus matrix is either very close to 1, because the particular pair of data points are similar and therefore the algorithm assigned them to the same cluster in most iterations, or very close to 0, because the particular pair of data points are dissimilar and therefore the algorithm assigned them to distinct clusters in most iterations. On the other hand, entries that are away from both 0 and 1 imply poor consensus. For example, an entry with the value 0.5 implies that the algorithm has placed the corresponding pair of data points in the same cluster in 50% of the iterations and in separate clusters in the remaining 50% of the iterations.

The rows and columns of a consensus matrix could be permuted to place the closest data points as adjacent to each other as possible, producing a symmetric heat map, which then helps decide consensus via visual inspection [9]. See [10] for a review of algorithms to produce a heat map from consensus matrix.

For visual inspection, an alternative approach to a heat map is to develop a histogram related to consensus matrix, called histogram of consensus indices [7]. The horizontal axis of the histogram represents the ratio of how many times a pair of two distinct data points appeared in the same cluster to how many times that pair was in the same subsample across all iterations. The vertical axis of the histogram shows how many times the particular ratio is attained. In the ideal case the histogram will trend towards only two bins, near 1 and 0, where the bin near 1 represents the pair of distinct data points that were in the same cluster and the bin near 0 represents the pair of distinct data points that were in separate clusters most of the time across all iterations.

For each of the K-means, Bisecting K-means, and Gaussian Mixture algorithms, we applied a consensus clustering technique to our data set to create three clusters with m=20 iterations. In each iteration, the random subsample consisted of ~90% of the original dataset. The histograms for each algorithm are shown below (the source code of the application that generated those diagrams will be reviewed in the following section).

Image titleFigure 3 Histogram for K-means algorithm.

Figure 3. Histogram for K-means algorithm.

Figure 4 Histogram for Bisecting K-means algorithm.

Figure 4. Histogram for Bisecting K-means algorithm.

Figure 5 Histogram for Gaussian Mixture algorithm.

Figure 5. Histogram for Gaussian Mixture algorithm.

For Bisecting K-means and Gaussian Mixture algorithms, the data mostly accumulates in the bins near 0 and 1, specifically in the intervals [0,0.2] and [0.8,1.0]. On the other hand, for the K-means algorithm, the bins between 0 and 1 contain far too many data points, particularly in the interval [0.4,0.7]. Therefore, we conclude that the K-means algorithm does not achieve a consensus as satisfactory as Bisecting K-means or Gaussian Mixture for the particular data set.

Code Review

We implemented a sample Java application to demonstrate consensus clustering. The application uses the Apache Spark machine learning library APIs for K-means, Bisecting K-means, and Gaussian Mixture (Spark version 2.3.0). An abstract class named ConsensusCluster stores all the logic for calculating the histogram. Three concrete classes, BisectingKMeansClusteringKMeansClustering and, GaussianMixtureClusteringeach representing a particular clustering algorithm, create the actual clusters.

Figure 6. Class diagram for the demo application.

Figure 6. Class diagram for the demo application.

The sample application uses a data set, as previously mentioned in ‘Problem Statement,’ downloaded from National Cancer Institute GDC Data Portal regarding Glioblastoma Multiforme (GBM) and Low-Grade Glioma (LGG) patients [11]. It consists of 76 distinct data points. For consensus calculations, we ran 20 iterations. In each iteration, data was randomly subsampled with each subsample containing ~90% of the original (full) data set.

With each algorithm, we created three clusters in every iteration. We generated histogram data after all iterations are completed. The plots of the histograms for each algorithm are given in Figures 3, 4, 5.


This parent class performs most of the work for calculating the histogram for consensus indices. The only work delegated to concrete child classes is the creation of clusters.

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors; public abstract class ConsensusCluster{ // Number of cluster we would like to construct
protected static int numClusters = 3; // Stores how many times two data points were assigned to the same cluster // across all iterations.
// A data point is represented as a Row object. Key to the HashMap is a data point // (outer point) and the corresponding value is a HashMap that maps a data point // (inner point) to an integer. Hence, to find how many times two data points were // assigned to the same cluster across all iterations pass one of those data points // (outer point) to countSameCluster to get the corresponding HashMap. Then, pass // the second data point(inner point) to that HashMap to get the integer value, // which is how many times those data points were assigned to the same cluster.
protected HashMap<String, HashMap<String, Integer>> countSameCluster = new HashMap<String, HashMap<String, Integer>>(); // Stores how many times two data points were assigned to the same data sample // across all iterations.
// A data point is represented as a Row object. Key to the HashMap is a data point // (outer point) and the corresponding value is a HashMap that maps a data point // (inner point) to an integer.Hence, to find how many times two data points were // in the same data sample across all iterations pass one of those data points // (outer point) to countSameSample to get the corresponding HashMap. Then, pass // the second data point (inner point) to that HashMap to get the integer value,
// which is how many times those data points were in the same data sample.
protected HashMap<String, HashMap<String, Integer>> countSameSample = new HashMap<String, HashMap<String, Integer>>();

Those data structures are initialized by the following method.

/** * Initialize 'countSameCluster' & 'countSameCluster' from full data set * * @param fullData: full data set */
protected void initializeCounts(JavaRDD<String> fullData) { fullData.cache(); List<String> pointsOuter = fullData.collect(); List<String> pointsInner = fullData.collect(); for (String pointOuter : pointsOuter) { HashMap<String, Integer> map = new HashMap<String, Integer>(); for (String pointInner : pointsInner) { map.put(pointInner, 0); } countSameCluster.put(pointOuter, map); } for (String pointOuter : pointsOuter) { HashMap<String, Integer> map = new HashMap<String, Integer>(); for (String pointInner : pointsInner) { map.put(pointInner, 0); } countSameSample.put(pointOuter, map); }

The following method updates the ‘countSameSample‘ data structure from a given subsample at a particular iteration.

* Given a subsample update the 'countSameSample' data structure.
* @param collectedSampledData - Data points in a subsample
protected void updateSameSampleCounts(List<String> collectedSampledData) { ArrayList<String> clonedData = new ArrayList<String>(); for (String s : collectedSampledData) { clonedData.add(s); } for (String s : collectedSampledData) { // Get all the points in 'countSameSample' for the particular // data point s in subsample HashMap<String, Integer> allPoints = countSameSample.get(s); for (String c : clonedData) { if(s.equals(c)){ continue; // ignore self } // Increment # times c & s were together in a subsample int currentVal = allPoints.get(c); currentVal++; allPoints.put(c, currentVal); } }

The following method updates the ‘countSameCluster‘ data structure from data points in a cluster at a particular iteration.

* Given a cluster update 'countSameCluster' data structure
* from data points in a cluster
* @param clusterPoints - Data points in a cluster
protected void updateSameClusterCounts(HashMap<Integer, Set<String>> clusterPoints) { // Each element in keys corresponds to a particular cluster, 0, 1, 2 Set<Integer> keys = clusterPoints.keySet(); for (int i : keys) { // Obtain points in that cluster Set<String> pointsOuter = clusterPoints.get(i); Set<String> pointsInner = clusterPoints.get(i); for (String pointOuter : pointsOuter) { // Get all the points in 'countSameCluster' for the // particular data point pointOuter in cluster HashMap<String, Integer> allPoints = countSameCluster.get(pointOuter); for (String pointInner : pointsInner) { if(pointOuter.equals(pointInner)){ continue; // ignore self } // Increment # times pointInner & pointOuter were together in a // cluster int currentVal = allPoints.get(pointInner); currentVal++; allPoints.put(pointInner, currentVal); } } }

The following method utilizes the previously introduced methods to update ‘countSameSample‘ and ‘countSameCluster‘ at the end of each iteration.

* At end of an iteration, update global data structures based on the subsample * used in the iteration and data points in the clusters created in that * iteration
* @param collectedSampledData - An ordered collection storing data points where * point at i-th element * belongs to the cluster defined in i-th element of collectedClusterIndexes
* @param collectedClusterIndexes - An ordered collection where each element * represents a cluster, 0, 1, 2 * * @param clusterCenters - Coordinates of each cluster
protected void updateHistogramData(List<String> collectedSampledData, List<Integer> collectedClusterIndexes, Vector[] clusterCenters) { // Update the 'countSameSample' data structure updateSameSampleCounts(collectedSampledData); // Key to 'clusterPoints' is a cluster identifier, e.g. 0, 1, 2 // The value is a Set where each element is a data point in the corresponding // cluster; data point is represented by its coordinates - a string of // whitespace separated numbers HashMap<Integer, Set<String>> clusterPoints = new HashMap<Integer, Set<String>>(); int j = 0; for (Integer i : collectedClusterIndexes) { Set<String> points = clusterPoints.get(i); if (points == null) { points = new HashSet<String>(); clusterPoints.put(i, points); } String tempRow = collectedSampledData.get(j++); points.add(tempRow); } // Update the 'countSameCluster' data structure updateSameClusterCounts(clusterPoints);

After all the iterations are completed, this method will calculate and print out the histogram data. Then, the histogram can be plotted using a graphical tool such as Microsoft Excel.

* Calculate and print out histogram data. Data for 10 bins will be printed out:
* 0.0 <number to display in bin [0.0,0.1)>
* 0.1 <number to display in bin [0.1,0.2)>
* 0.2 <number to display in bin [0.2,0.3)>
* ...
* 0.9 <number to display in bin [0.9,1.0]>
*/ protected void generateHistogram() { // range: 0.1 HashMap<Integer, Integer> histogram = new HashMap<Integer, Integer>(); // Initialize with all 0's. histogram.put(0, 0); histogram.put(1, 0); histogram.put(2, 0); histogram.put(3, 0); histogram.put(4, 0); histogram.put(5, 0); histogram.put(6, 0); histogram.put(7, 0); histogram.put(8, 0); histogram.put(9, 0); for (String sOuter : countSameCluster.keySet()) { // sOuter is a particular data point. // inSameCluster stores how many times a particular data point was in // the same cluster as sOuter // inSameSample stores how many times a particular data point was in // the same subsample as sOuter HashMap<String, Integer> inSameCluster = countSameCluster.get(sOuter); HashMap<String, Integer> inSameSample = countSameSample.get(sOuter); for (String sInner : inSameCluster.keySet()) { // sInner is a particular data point that was in the same cluster as // sOuter if (sOuter.equals(sInner)) continue; // Ignore self // how many times sInner and sOuter were in the same cluster int numTimesInSameCluster = inSameCluster.get(sInner); // how many times sInner and sOuter were in the same subsample int numTimesInSameSample = inSameSample.get(sInner); // Calculate the ratio and place into the corresponding bin float ratio = (numTimesInSameCluster == 0) ? 0f : (float) numTimesInSameCluster / numTimesInSameSample; if (0 <= ratio && ratio < 0.1) { int val = histogram.get(0); val++; histogram.put(0, val); } else if (0.1 <= ratio && ratio < 0.2) { int val = histogram.get(1); val++; histogram.put(1, val); } else if (0.2 <= ratio && ratio < 0.3) { int val = histogram.get(2); val++; histogram.put(2, val); } else if (0.3 <= ratio && ratio < 0.4) { int val = histogram.get(3); val++; histogram.put(3, val); } else if (0.4 <= ratio && ratio < 0.5) { int val = histogram.get(4); val++; histogram.put(4, val); } else if (0.5 <= ratio && ratio < 0.6) { int val = histogram.get(5); val++; histogram.put(5, val); } else if (0.6 <= ratio && ratio < 0.7) { int val = histogram.get(6); val++; histogram.put(6, val); } else if (0.7 <= ratio && ratio < 0.8) { int val = histogram.get(7); val++; histogram.put(7, val); } else if (0.8 <= ratio && ratio < 0.9) { int val = histogram.get(8); val++; histogram.put(8, val); } else if (0.9 <= ratio && ratio <= 1) { int val = histogram.get(9); val++; histogram.put(9, val); } } } // Display histogram data System.out.println("HISTOGRAM"); for (int i : histogram.keySet()) { System.out.println(String.format("%.1f", (double)(i * 0.1)) + " " + histogram.get(i)); }

Below is a helper function that converts a data point, expressed as a string consisting of 3 numbers, into a 3-D vector of doubles.

* This is a helper function that converts a data point, expressed as a string * consisting of 3 numbers, * into a 3-D vector of doubles. */
static Function<String, Vector> mapFunction = new Function<String, Vector>() { public Vector call(String s) { String[] sarray = s.split(" "); double[] values = new double[sarray.length]; for (int i = 0; i < sarray.length; i++) values[i] = Double.parseDouble(sarray[i]); return Vectors.dense(values); }

This is the abstract method to be implemented by child classes. It partitions subsampled data into clusters.

* Partition subsampled data into clusters. Implemented by a concrete class based * on a specific algorithm.
* * @param data - subsampled data
* @param numClusters - # clusters to create
* @param numIterations - # iterations to perform
* @return JavaRDD<Integer> - This data structure has the same number of elements * as input parameter * data. Each element in JavaRDD<Integer> indicates which cluster the corresponding * element in data belongs to, e.g. 0, 1, 2 etc.
* */
protected abstract JavaRDD<Integer> dataCenters(JavaRDD<Vector> data, int numClusters, int numIterations);

This is the main method. It starts Spark configuration and context, initializes the data structures, and starts iterations. In each iteration, it creates clusters via a concrete subclass and updates the data structures from that iteration. Finally, it calculates and prints out the histogram data.

/** * This is the main method that performs all the tasks. */
protected void iterations() { // Set application name String appName = "Consensus Clustering Demo"; // # iterations to determine clusters int numIterations = 125; // Initialize Spark configuration & context SparkConf sparkConf = new SparkConf().setAppName(appName). setMaster("local[1]").set("spark.executor.memory", "1g"); JavaSparkContext sc = new JavaSparkContext(sparkConf); // Read data file from file system. String path = "resources/TP53-VarClsf.txt"; // Read the data file and return it as JavaRDD of strings JavaRDD<String> nonUniqueData = sc.textFile(path); System.out.println(nonUniqueData.count()); // Remove any duplicates JavaRDD<String> fullData = nonUniqueData.distinct(); System.out.println(fullData.count()); // Initialize global data structures initializeCounts(fullData); // Number of iterations int MAX_ITER = 20; // Each execution of this loop corresponds to an iteration for (int iteration = 0; iteration <= MAX_ITER; iteration++) { // Obtain a random subsample, consisting of 90% of the original data set JavaRDD<String> sampledData = fullData.sample(false, 0.9, (new Random().nextLong())); // Convert data point, expressed as a string consisting of 3 numbers, into // a 3-D vector of doubles JavaRDD<Vector> data =; data.cache(); // Rely on concrete subclasses for this method. This is where clusters are // created by a specific algorithm Tuple2<JavaRDD<Integer>, Vector[]> pair = dataCenters(data, numClusters, numIterations); // Each element in clusterIndexes represents a cluster, 0, 1, 2 JavaRDD<Integer> clusterIndexes = pair._1(); // Each element in clusterCenters gives coordinates of a particular // cluster, 0, 1, 2 Vector[] clusterCenters = pair._2(); // Bring all data to driver node for displaying results. // 'collectedSampledData' and 'collectedClusterIndexes' are ordered // collections with the same size: i-th element in 'collectedSampledData' // is a data point that belongs to the cluster defined in the i-th element // in 'collectedClusterIndexes' List<String> collectedSampledData = sampledData.collect(); List<Integer> collectedClusterIndexes = clusterIndexes.collect(); System.out.println(collectedSampledData.size() + " " + collectedClusterIndexes.size()); // Update global data structures based on cluster data updateHistogramData(collectedSampledData, collectedClusterIndexes, clusterCenters); } // Generate histogram generateHistogram(); // Stop spark sc.stop(); sc.close();

Now, we will review the child classes. Each child class will implement protected abstract JavaRDD<Integer> dataCenters() to partition subsampled data into clusters based on a specific algorithm.


In its main method, KMeansClustering triggers iterations() implemented by the parent class. It also implements the dataCenters() method to create clusters based on the K-means algorithm using KMeans and KMeansModel in the org.apache.spark.mllib.clustering package.

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector; public class KMeansClustering extends ConsensusCluster { /** * The main method. It triggers iterations() implemented by the * parent class. */ public static void main(String[] args) throws Exception { Logger.getRootLogger().setLevel(Level.WARN); (new KMeansClustering()).iterations(); } /** * In ConsensusCluster, this method was abstract; implementation * was left to child classes. Here we first create a KMeans object and then * call its run() method on data to obtain a KMeansModel object. Then, we call * the predict() method on KMeansModel object to obtain a data structure, * clusterIndexes, that gives the cluster indexes for the input parameter * data. The clusterIndexes and data have the same number of elements. The * elements in the clusterIndexes and data have reciprocal sequence in the * sense that the i-th element of clusterIndexes defines which cluster the * i-th element of data belongs to, one of 0, 1, 2 ... etc. * * @param data - Data points to partition into clusters * @param numClusters - number of clusters desired * @param numIterations - maximum # of iterations to perform during clustering * */ public JavaRDD<Integer> dataCenters(JavaRDD<Vector> data, int numClusters, int numIterations){ KMeans km = new KMeans().setK(numClusters).setMaxIterations(numIterations); KMeansModel clusters =; JavaRDD<Integer> clusterIndexes = clusters.predict(data); return clusterIndexes; }


In its main method, BisectingKMeansClustering triggers iterations() implemented by the parent class. It also implements the dataCenters() method to create clusters based on the Bisecting K-means algorithm using BisectingKMeans and BisectingKMeansModel in the org.apache.spark.mllib.clustering package.

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector; public class BisectingKMeansClustering extends ConsensusCluster{ /** * The main method. It triggers iterations() implemented by the * parent class. */ public static void main(String[] args) throws Exception { Logger.getRootLogger().setLevel(Level.WARN); (new BisectingKMeansClustering()).iterations(); } /** * In ConsensusCluster, this method was abstract; implementation * was left to child classes. Here we first create a BisectingKMeans object * and then call its run() method on data to obtain a BisectingKMeansModel * object. Then, we call the predict() method on BisectingKMeansModel object * to obtain a data structure, clusterIndexes, that gives the cluster indexes * for the input parameter data. The clusterIndexes and data have the same * number of elements. The elements in the clusterIndexes and data have * reciprocal sequence in the sense that the i-th element of clusterIndexes * defines which cluster the i-th element of data belongs to, one of 0, 1, * 2 ... etc. * * @param data - Data points to partition into clusters * @param numClusters - number of clusters desired * @param numIterations - maximum # of iterations to perform during clustering * */ public JavaRDD<Integer> dataCenters(JavaRDD<Vector> data, int numClusters, int numIterations){ BisectingKMeans bkm = new BisectingKMeans().setK(numClusters).setMaxIterations(numIterations); BisectingKMeansModel clusters =; JavaRDD<Integer> clusterIndexes = clusters.predict(data); return clusterIndexes; }


In its main method, GaussianMixtureClustering triggers iterations() implemented by the parent class. It also implements the dataCenters() method to assign data points to clusters based on Gaussian Mixture algorithm using GaussianMixture and GaussianMixtureModel in the org.apache.spark.mllib.clustering package.

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector; public class GaussianMixtureClustering extends ConsensusCluster { /** * The main method. It triggers iterations() implemented by the * parent class. */ public static void main(String[] args) throws Exception { Logger.getRootLogger().setLevel(Level.WARN); (new BisectingKMeansClustering()).iterations(); } /** * In ConsensusCluster, this method was abstract; implementation * was left to child classes. Here we first create a GaussianMixture object * and then call its run() method on data to obtain a GaussianMixtureModel * object. Then, we call the predict method on GaussianMixtureModel object to * obtain a data structure, clusterIndexes, that gives the cluster indexes for * the input parameter data. The clusterIndexes and data have the same number * of elements. The elements in the clusterIndexes and data have reciprocal * sequence in the sense that the i-th element of clusterIndexes * defines which cluster the i-th element of data belongs to, one of 0, 1, * 2 ... etc. * * @param data - Data points to partition into clusters * @param numClusters - number of clusters desired * @param numIterations - maximum # of iterations to perform during clustering * */ protected JavaRDD<Integer> dataCenters(JavaRDD<Vector> data, int numClusters, int numIterations) { GaussianMixture gm = new GaussianMixture().setK(numClusters).setMaxIterations(numIterations); GaussianMixtureModel clusters =; JavaRDD<Integer> clusterIndexes = clusters.predict(data); return clusterIndexes; }


In this article, we discussed consensus clustering, a technique to represent consensus across multiple runs of a clustering algorithm and analyze the stability of discovered clusters with respect to sampling variability. We reviewed a sample application built with the Apache Spark machine learning library to show how consensus clustering can be used with K-means, Bisecting K-means, and Gaussian Mixture, three distinct clustering algorithms. For the example data set considered in the application, we calculated and plotted the histograms of consensus indices. The histograms indicated that Bisecting K-means and Gaussian Mixture algorithms delivered more stable clusters than the K-means algorithm. Some concluding remarks are as follows.

  • Consensus clustering can be applied to any clustering algorithm and is useful to compare different algorithms on the same dataset. However, consensus clustering does not aim at deciding which clustering algorithm is more stable than other algorithms in general. The results are dependent on a particular data set and the nature of the relationship between data points. For example, a hierarchical clustering algorithm may fail to achieve satisfactory consensus for a data set that is not hierarchical in nature, or vice versa.

  • Consensus clustering can additionally be used to determine the optimal number of clusters for a data set and clustering algorithm. A consensus matrix could also be used as a similarity measure, an alternative to other measures, e.g. Euclidean distance [7].

  • The 3-dimensional graphs of sample clusters in Figures 1.a, 1.b were obtained by Java-based jzy3d framework [12] and histograms in Figures 3 – 5 were plotted by Microsoft Excel.

  • Complete source code for the sample application can be found in [13].

[1] Cluster Analysis, Wikipedia.

[2] Automatic symmetry based cluster approach for anomalous brain identification in PET scan image – An Analysis, A. Meena and K. Raja, Cornell University Library.

[3] Data Clustering Using Apache Spark, K. Unyelioglu, DZone / Big Data Zone.

[4] Classification and Prediction of Survival in Hepatocellular Carcinoma by Gene Expression Profiling, J-S Lee et. al., Hepatology, 2004.

[5] The Elements of Statistical Learning, T. Hastie et. al., Springer Series in Statistics, 2009.

[6] Pattern Classification, R.O. Duda et. al., John Wiley & Sons, 2000.

[7] Consensus Clustering: A Resampling-Based Method for Class Discovery and Visualization of Gene Expression Microarray Data, S. Monti et. al., Machine Learning, 52, 91–118, 2003.

[8] Apache Spark,

[9] Heat Map, Wikipedia.

[10] The History of the Cluster Heat Map, L. Wilkinson and M. Friendly, The American Statistician Volume 63 Issue 2, 2009.

[11] National Cancer Institute Genomic Data Commons Project,

[12] Open Source API for 3D Charts,

[13] Consensus-clustering,

Original Link

Spark Stream-Stream Join

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this post, we are going to see how beautifully Spark now gives support for joining the two streaming data frames.

I this example, I am going to use:

Apache Spark 2.3.0
Apache Kafka
Scala 2.11.8

The build.sbt, our code looks like the following:

scalaVersion := "2.11.8" libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0", "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0", "org.apache.kafka" % "kafka-clients" % "")

To create the two streaming data frames, I am going to send the data to Kafka with some regular time interval in two separate topics, here I’ve named them ‘dataTopic1’ and ‘dataTopic2.’

For sending the data, first I simply make the list of integers and send these integers from the list to the Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record => val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString) producer.send(producerRecord) Thread.sleep(200)

In the same way, I send the data to the second Kafka topic called “dataTopic2.”

val records2 = (5 to 15).toList
records2.foreach(record => { val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString) producer.send(producerRecord) Thread.sleep(200)

After sending the data to Kafka, I start reading the data frame from the topic like I readdataFrame1 from the Kafka topic “dataTopic1” as follow:

val dataFrame1 = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootStrapServers) .option("subscribe", "dataTopic1") .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)] .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1")) .select("data", "timestamp1")

In the same way, I readdataFrame2 from the Kafka topic “dataTopic2.” After that, apply the join on the data frame column called “data” as follows:

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming data frames into one, whose schema is as follow:

root |-- data: integer (nullable = true) |-- timestamp1: timestamp (nullable = true) |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the data frames, it will give us the two timestamps, one for each of the data. Because of this, the final output is as follows:


If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

Original Link

Real-Time Streaming ETL With SnappyData

ETL has always been a big part of prepping data for analytics. The origins of this go back to the development of relational databases and how they were “normal-form” optimized for changes and reads for transactional systems. Once that data rolled off the transactional system, it had to be decomposed into a form that was more suitable for read-only analytics. The ETL process became a popular concept in the 1970s. Data extraction is where data is extracted from the source data store; data transformation is all about converting the data to the format or structure for the purposes of querying and analysis; data loading is the phase where the transformed data is loaded into its final analytics destination store, e.g. operational data storedata mart, or data warehouse. These processes were batch-oriented and complex. Chewing gum and bailing wire was very much the order of the day. Latency was never a consideration because these processes were monolithic and offline in 100% of cases.

As database products proliferated, the list of products that serviced that market grew considerably. Depending on whether you are an Oracle, IBM, or Microsoft shop, you had a choice of ETL products. But the product stack remained roughly the same and the cost, complexity, and latency associated with doing ETL remained largely constant. Vendors like Talend and Informatica strived to provide connectors to a wide variety of sources. All of these products essentially run batch jobs and are not suitable for real-time ETL. These products were born in an era where distributed computing was not the norm; they were not built from the ground up to be horizontally scalable. The expectation was that these were batch processes and the focus was on getting the job done and specialized knowledge was considered a prerequisite for using these tools.

The diagram below shows how the ETL process, as executed today, is inherently a batch process that is complex, is high-latency, and involves multiple user kernel crossings.

There is clearly a need to reduce the complexity of operating ETL processes. Additionally, monitoring them and ultimately getting the data to its analytic destination in a timely manner without errors or data loss is paramount.

With the advent of big data streaming products like Spark, Samza, and Flink, there has been renewed interest in doing ETL, this time without the complexity or latency and also making it real time. This is where customers asked us to craft a solution that was less messy, used standard interfaces, cut down latency, and allowed real-time transformation of data, thereby enabling quicker insights on the business closer to where the events occurred.

Introducing SnappyData Agile ETL:

Doing real-time streaming ETL from operational stores requires a system that has 3 important capabilities.

  • The ability to read data from relational and non-relational data sources without impacting the throughput of those systems. This can typically be done using change data capture mechanisms. Given the myriad number of data sources from which data can be read, the ability to capture changes at the source is critical.
  • The ability to stream the data into a streaming engine and apply rich transformations on it so as to make it consumable for analytics in the target system.
  • The ability to write transformed data out to target systems using standard connectors which can do bulk writes. Messaging providers can also be used to route the data to their final destination.

In addition to this, the system needs to have a metadata repository that is updateable using standard interfaces like SQL (or Spark).

SnappyData offers a robust in-memory database that is accessible via ODBC, JDBC, and REST interfaces. Data stored in SnappyData can be persisted in the cluster or persisted on S3, Azure Storage, etc. SnappyData offers in-database streaming which allows streaming applications to read data from sources like SQLServer, Oracle, MySQL, etc. in a highly parallel manner. Data sources can be HDFS or anything that writes to Kafka since SnappyData treats Kafka as a streaming source. The streaming application performs rich transformations on the stream to convert the data to the format in which it can be consumed by the target system. The application uses the SnappyData repository to store metadata needed to complete these transforms. Once completed, it uses Spark connectors to write data out to any target that has a Spark connector defined. These writes can also be partitioned out in the SnappyData cluster leading to high throughput and low end-to-end latency for the ETL. Most importantly, the system is scalable end-to-end since you can easily add new sources by configuring new streaming applications to run in the cluster ensuring source scalability. The writes to the target system are horizontally scalable ensuring sink scalability and the cluster metadata itself can grow without bounds  without increasing the cost of doing stream transformations.

With SnappyData, real-time streaming ETL becomes a robust, easy to use system which leverages all the work that has been done in Spark.

Lower Complexity:

In most enterprises, DBAs would not want to install any process on the database machines beyond turning on CDC or turning on supplemental logging which allows transaction log reader approaches to CDC. Alternative approaches that require CDC updates to be written out to a message bus are more intrusive and require adding more infrastructure to mission-critical database machines. As an example, a leading ETL product that works with SAP Oracle databases requires log reader and listener processes on the database machine to read the database logs, which then puts additional requirements on monitoring those components, and consumes CPU, memory, and disk (all precious resources) on the database machine. Our approach primarily reads CDC data off the machine and directly into the stream, leading to fewer moving parts overall in the ETL process


For most data sources, SnappyData integrates into the existing security mechanisms in the database. This uses existing security measures and does not open up a new access point in the database.

No Sink Connector Processes:

The stream application transforms the data and microbatches the changes out to the target system directly using Spark connectors. The sink system does not need to pull in the changes and have additional machinery installed on it, which leads to lower operational complexity and better performance. The transformed payload can be emitted to messaging providers as well for consumption by multiple endpoints.

Scalable Metadata Storage:

The SnappyData ETL engine is built into an in-memory datastore which offers a scalable metadata repository which is persisted and loaded upon startup. SnappyData offers robust monitoring capabilities that allow for streaming applications to be managed easily.


Eliminating additional processes eliminates unnecessary user kernel crossings, which cuts down CPU costs, improves throughput, and improves system availability. The use of partitions for processing streams allows for a massive degree of parallelism which improves performance significantly.

Studio Driven Configuration (Coming Soon):

SnappyData offers the Snappy Agile Studio which allows end users to configure sources and targets and provides them with the ability to store metadata needed to complete the transformations.

This article introduced the rationale for real-time streaming ETL and the advantages of the SnappyData approach to real-time streaming ETL. In future articles, we will explore the nuts of bolts of how we autoscale the solution, how we make the “Transform” layer pluggable and how checkpointing and resumability is built into the solution. 

Original Link

Vectorized Algorithms in Java

There has been a Cambrian explosion of JVM data technologies in recent years. It’s all very exciting, but is the JVM really competitive with C in this area? I would argue that there is a reason Apache Arrow is polyglot, and it’s not just interoperability with Python. To pick on one project impressive enough to be thriving after seven years, if you’ve actually used Apache Spark, you will be aware that it looks fastest next to its predecessor, MapReduce. Big data is a lot like teenage sex: everybody talks about it, nobody really knows how to do it, and everyone keeps their embarrassing stories to themselves. In games of incomplete information, it’s possible to overestimate the competence of others: nobody opens up about how slow their Spark jobs really are because there’s a risk of looking stupid.

If it can be accepted that Spark is inefficient, the question becomes: Is Spark fundamentally inefficient?Flare provides a drop-in replacement for Spark’s backend, but replaces JIT compiled code with highly efficient native code, yielding order of magnitude improvements in job throughput. Some of Flare’s gains come from generating specialized code, but the rest comes from just generating better native code than C2 does. If Flare validates Spark’s execution model, perhaps it raises questions about the suitability of the JVM for high-throughput data processing.

I think this will change radically in the coming years. I think the most important reason is the advent of explicit support for SIMD provided by the vector API, which is currently incubating in Project Panama. Once the vector API is complete, I conjecture that projects like Spark will be able to profit enormously from it. This post takes a look at the API in its current state and ignores performance.

Why Vectorization?

Assuming a flat processor frequency, throughput is improved by a combination of executing many instructions per cycle (pipelining) and processing multiple data items per instruction (SIMD). SIMD instruction sets are provided by Intel as the various generations of SSE and AVX. If throughput is the only goal, maximizing SIMD may even be worth reducing the frequency, which can happen on Intel chips when using AVX. Vectorization allows throughput to be increased by the use of SIMD instructions.

Analytical workloads are particularly suitable for vectorization, especially over columnar data, because they typically involve operations consuming the entire range of a few numerical attributes of a dataset. Vectorized analytical processing with filters is explicitly supported by vector masks, and vectorization is also profitable for operations on indices typically performed for filtering prior to calculations. I don’t actually need to make a strong case for the impact of vectorization on analytical workloads: just read the work of top researchers like Daniel Abadi and Daniel Lemire.

Vectorization in the JVM

C2 provides quite a lot of auto-vectorization, which works very well sometimes, but the support is limited and brittle. I have written about this several times. Because AVX can reduce the processor frequency, it’s not always profitable to vectorize, so compilers employ cost models to decide when they should do so. Such cost models require platform specific calibration, and sometimes C2 can get it wrong. Sometimes, specifically in the case of floating point operations, using SIMD conflicts with the JLS, and the code C2 generates can be quite inefficient. In general, data parallel code can be better optimized by C compilers such as GCC than C2 because there are fewer constraints, and there is a larger budget for analysis at compile time. This all makes having intrinsics very appealing, and as a user, I would like to be able to:

  1. Bypass JLS floating point constraints.
  2. Bypass cost model-based decisions.
  3. Avoid JNI at all costs.
  4. Use a modern “object-functional” style. SIMD intrinsics in C are painful.

There is another attempt to provide SIMD intrinsics to JVM users via LMS, a framework for writing programs which write programs, designed by Tiark Rompf (who is also behind Flare). This work is very promising (I have written about it before), but it uses JNI. It’s only at the prototype stage, but currently, the intrinsics are auto-generated from XML definitions, which leads to a one-to-one mapping to the intrinsics in immintrin.h, yielding a similar programming experience. This could likely be improved a lot, but the reliance on JNI is fundamental, albeit with minimal boundary crossing.

I am quite excited by the vector API in Project Panama because it looks like it will meet all of these requirements, at least to some extent. It remains to be seen quite how far the implementors will go in the direction of associative floating point arithmetic, but it has to opt out of JLS floating point semantics to some extent, which I think is progressive.

The Vector API

Disclaimer: Everything below is based on my experience with a recent build of the experimental code in the Project Panama fork of OpenJDK. I am not affiliated with the design or implementation of this API, may not be using it properly, and it may change according to its designers’ will before it is released!

To understand the vector API, you need to know that there are different register widths and different SIMD instruction sets. Because of my area of work, and 99% of the server market is Intel, I am only interested in AVX, but ARM have their own implementations with different maximum register sizes, which presumably need to be handled by a JVM vector API. On Intel CPUs, SSE instruction sets use up to 128-bit registers (xmm, four ints), AVX and AVX2 use up to 256-bit registers (ymm, eight ints), and AVX512 use up to 512-bit registers (zmm, sixteen ints).

The instruction sets are typed, and instructions designed to operate on packed doubles can’t operate on packed ints without explicit casting. This is modeled by the interface Vector<Shape>, parameterized by the Shape interface, which models the register width.

The types of the vector elements are modeled by abstract element type specific classes such as IntVector. At the leaves of the hierarchy are the concrete classes specialized both to element type and register width, such as IntVector256, which extends IntVector<Shapes.S256Bit>.

Since EJB, the word factory has been a dirty word, which might be why the word species is used in this API. To create a IntVector<Shapes.S256Bit>, you can create the factory/species as follows:

public static final IntVector.IntSpecies<Shapes.S256Bit> YMM_INT = (IntVector.IntSpecies<Shapes.S256Bit>) Vector.species(int.class, Shapes.S_256_BIT);

There are now various ways to create a vector from the species, which all have their use cases. First, you can load vectors from arrays: Imagine you want to calculate the bitwise intersection of two int[]s. This can be written quite cleanly without any shape/register information.

public static int[] intersect(int[] left, int[] right) { assert left.length == right.length; int[] result = new int[left.length]; for (int i = 0; i < left.length; i += YMM_INT.length()) { YMM_INT.fromArray(left, i) .and(YMM_INT.fromArray(right, i)) .intoArray(result, i); }

A common pattern in vectorized code is to broadcast a variable into a vector, for instance, to facilitate the multiplication of a scalar by a vector.

IntVector<Shapes.S256Bit> multiplier = YMM_INT.broadcast(x);

Or to create a vector from some scalars; for instance, in a lookup table.

IntVector<Shapes.S256Bit> vector = YMM_INT.scalars(0, 1, 2, 3, 4, 5, 6, 7);

A zero vector can be created from a species:

IntVector<Shapes.S256Bit> zero =;

The big split in the class hierarchy is between integral and floating point types. Integral types have meaningful bitwise operations (I am looking forward to trying to write a vectorized population count algorithm), which are absent from FloatVector and DoubleVector, and there is no concept of fused-multiply-add for integral types, so there is obviously no IntVector.fma. The common subset of operations is arithmetic, casting, and loading/storing operations.

I generally like the API a lot: It feels familiar to programming with streams but, on the other hand, it isn’t too far removed from traditional intrinsics. Below is an implementation of a fast matrix multiplication written in C, and below it is the same code written with the vector API:

static void mmul_tiled_avx_unrolled(const int n, const float *left, const float *right, float *result) { const int block_width = n >= 256 ? 512 : 256; const int block_height = n >= 512 ? 8 : n >= 256 ? 16 : 32; for (int column_offset = 0; column_offset < n; column_offset += block_width) { for (int row_offset = 0; row_offset < n; row_offset += block_height) { for (int i = 0; i < n; ++i) { for (int j = column_offset; j < column_offset + block_width && j < n; j += 64) { __m256 sum1 = _mm256_load_ps(result + i * n + j); __m256 sum2 = _mm256_load_ps(result + i * n + j + 8); __m256 sum3 = _mm256_load_ps(result + i * n + j + 16); __m256 sum4 = _mm256_load_ps(result + i * n + j + 24); __m256 sum5 = _mm256_load_ps(result + i * n + j + 32); __m256 sum6 = _mm256_load_ps(result + i * n + j + 40); __m256 sum7 = _mm256_load_ps(result + i * n + j + 48); __m256 sum8 = _mm256_load_ps(result + i * n + j + 56); for (int k = row_offset; k < row_offset + block_height && k < n; ++k) { __m256 multiplier = _mm256_set1_ps(left[i * n + k]); sum1 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j), sum1); sum2 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 8), sum2); sum3 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 16), sum3); sum4 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 24), sum4); sum5 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 32), sum5); sum6 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 40), sum6); sum7 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 48), sum7); sum8 = _mm256_fmadd_ps(multiplier, _mm256_load_ps(right + k * n + j + 56), sum8); } _mm256_store_ps(result + i * n + j, sum1); _mm256_store_ps(result + i * n + j + 8, sum2); _mm256_store_ps(result + i * n + j + 16, sum3); _mm256_store_ps(result + i * n + j + 24, sum4); _mm256_store_ps(result + i * n + j + 32, sum5); _mm256_store_ps(result + i * n + j + 40, sum6); _mm256_store_ps(result + i * n + j + 48, sum7); _mm256_store_ps(result + i * n + j + 56, sum8); } } } }
 private static void mmul(int n, float[] left, float[] right, float[] result) { int blockWidth = n >= 256 ? 512 : 256; int blockHeight = n >= 512 ? 8 : n >= 256 ? 16 : 32; for (int columnOffset = 0; columnOffset < n; columnOffset += blockWidth) { for (int rowOffset = 0; rowOffset < n; rowOffset += blockHeight) { for (int i = 0; i < n; ++i) { for (int j = columnOffset; j < columnOffset + blockWidth && j < n; j += 64) { var sum1 = YMM_FLOAT.fromArray(result, i * n + j); var sum2 = YMM_FLOAT.fromArray(result, i * n + j + 8); var sum3 = YMM_FLOAT.fromArray(result, i * n + j + 16); var sum4 = YMM_FLOAT.fromArray(result, i * n + j + 24); var sum5 = YMM_FLOAT.fromArray(result, i * n + j + 32); var sum6 = YMM_FLOAT.fromArray(result, i * n + j + 40); var sum7 = YMM_FLOAT.fromArray(result, i * n + j + 48); var sum8 = YMM_FLOAT.fromArray(result, i * n + j + 56); for (int k = rowOffset; k < rowOffset + blockHeight && k < n; ++k) { var multiplier = YMM_FLOAT.broadcast(left[i * n + k]); sum1 = sum1.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j)); sum2 = sum2.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 8)); sum3 = sum3.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 16)); sum4 = sum4.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 24)); sum5 = sum5.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 32)); sum6 = sum6.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 40)); sum7 = sum7.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 48)); sum8 = sum8.fma(multiplier, YMM_FLOAT.fromArray(right, k * n + j + 56)); } sum1.intoArray(result, i * n + j); sum2.intoArray(result, i * n + j + 8); sum3.intoArray(result, i * n + j + 16); sum4.intoArray(result, i * n + j + 24); sum5.intoArray(result, i * n + j + 32); sum6.intoArray(result, i * n + j + 40); sum7.intoArray(result, i * n + j + 48); sum8.intoArray(result, i * n + j + 56); } } } } }

They just aren’t that different, and it’s easy to translate between the two. I wouldn’t expect it to be fast yet, though. I have no idea what the scope of work involved in implementing all of the C2 intrinsics to make this possible is, but I assume it’s vast. The class jdk.incubator.vector.VectorIntrinsics seems to contain all of the intrinsics implemented so far, and it doesn’t contain every operation used in my array multiplication code. There is also the question of value types and vector box elimination. I will probably look at this again in the future when more of the JIT compiler work has been done, but I’m starting to get very excited about the possibility of much faster JVM-based data processing.

Original Link

How Spark Internally Executes a Program

Hello, everyone! In my previous article, I explained the difference between RDD, DF, and DS. You can find this article here.

In this article, I will try to explain how Spark works internally and what the components of execution are: jobs, tasks, and stages.

As we all know, Spark gives us two operations for performing any problem.

When we do a transformation on any RDD, it gives us a new RDD. But it does not start the execution of those transformations. The execution is performed only when an action is performed on the new RDD and gives us a final result.

So once you perform any action on an RDD, Spark context gives your program to the driver.

The driver creates the DAG (directed acyclic graph) or execution plan (job) for your program. Once the DAG is created, the driver divides this DAG into a number of stages. These stages are then divided into smaller tasks and all the tasks are given to the executors for execution.

The Spark driver is responsible for converting a user program into units of physical execution called tasks. At a high level, all Spark programs follow the same structure. They create RDDs from some input, derive new RDDs from those using transformations, and perform actions to collect or save data. A Spark program implicitly creates a logical directed acyclic graph (DAG) of operations.

When the driver runs, it converts this logical graph into a physical execution plan.

So, let’s take an example of word count for better understanding:

val rdd = sc.textFile("address of your file") rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_ + _).collect

Here you can see that collect is an action that will collect all data and give a final result. As explained above, when I perform the collect action, the Spark driver creates a DAG.

In the image above, you can see that one job is created and executed successfully. Now, let’s have a look at DAG and its stages.

Here, you can see that Spark created the DAG for the program written above and divided the DAG into two stages.

In this DAG, you can see a clear picture of the program. First, the text file is read. Then, the transformations like map and flatMap are applied. Finally, reduceBykey is executed.

But why did Spark divided this program into two stages? Why not more than two or less than two? Basically, it depends on shuffling, i.e. whenever you perform any transformation where Spark needs to shuffle the data by communicating to the other partitions, it creates other stages for such transformations. And the transformation does not require the shuffling of your data; it creates a single stage for it.

Now, let’s have a look at how many tasks have been created by Spark:

As I mentioned earlier, the Spark driver divides DAG stages into tasks. Here, you can see that each stage is divided into two tasks.

But why did Spark divide only two tasks for each stage? It depends on your number of partitions.

In this program, we have only two partitions, so each stage is divided into two tasks. And a single task runs on a single partition. The number of tasks for a job is:

( no of your stages * no of your partitions )

Now, I think you may have a clear picture of how Spark works internally.

Original Link

Scala Spark Integration With Apache NiFi

Learn how to execute Scala Apache Spark code in JARs from Apache NiFi — because you don’t want all of your Scala code in a continuous block like Apache Zeppelin. We’re going to use Apache NiFi, Apache Livy, Apache Spark, and Scala.


Here’s the inline Scala code:

Apache Zeppelin running the Same Scala Job (have to add the JAR to the interpreter for Spark and restart):

Grafana Charts of Apache NiFi Run:

Log search helps you find errors:

Run code for your Spark class:

Setting up your ExecuteSparkInteractive processor:

Setting up your Spark service for Scala:

Tracking the job in Livy UI:

Tracking the job in Spark UI:

I was looking at pulling code from Git and putting it into a NiFi attribute and running directly. For bigger projects, you’ll have many classes and dependencies that may require a full IDE and SBT build cycle. Once I build a Scala JAR, I want to run against that.

Example code:

package com.dataflowdeveloper.example
import org.apache.spark.sql.SparkSession
class Example () { def run( spark: SparkSession) { try { println("Started") val shdf ="hdfs://") shdf.printSchema() shdf.createOrReplaceTempView("smartplug") val stuffdf = spark.sql("SELECT * FROM smartplug") stuffdf.count() println("Complete.") } catch { case e: Exception => e.printStackTrace(); } }
} =--- Run that with import com.dataflowdeveloper.example.Example
println("Before run")
val job = new Example()
println("After run") === after run
{"text\/plain":"After run"}

Here’s the GitHub link.

Original Link

An Intro to Apache Spark Partitioning: What You Need to Know

Apache Spark’s resilient distributed datasets (RDD) are a collection of various data that are so big in size, they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes partitions across different nodes. They are evaluated lazily (i.e. the execution will not start until an action is triggered that increases manageability, saves computation, and thus increases optimization and speed) and the transformations are stored as directed acyclic graphs (DAG). So, every action on the RDD will make Apache Spark recompute the DAG.

It’s important to understand the characteristics of partitions in Apache Spark to guide you in achieving better performance, debugging, and error handling.

Here are some of the basics of partitioning:

  • Every node in a Spark cluster contains one or more partitions.
  • The number of partitions used in Spark is configurable. Having too few (causing less concurrency, data skewing, and improper resource utilization) or too many (causing task scheduling to take more time than actual execution time) partitions is not good. By default, it is set to the total number of cores on all the executor nodes.
  • Partitions in Spark do not span multiple machines.
  • Tuples in the same partition are guaranteed to be on the same machine.
  • Spark assigns one task per partition and each worker can process one task at a time.

Hash Partitioning vs. Range Partitioning in Apache Spark

Apache Spark supports two types of partitioning: hash partitioning and range partitioning. Knowing what keys in your data are distributed or sequenced, as well as the action you want to perform on your data, can help you select the appropriate techniques. There are many factors which affect partitioning choices:

  • Available resources: Number of cores on which the task can run.
  • External data sources: The size of local collections, Cassandra table, or HDFS file determines the number of partitions.
  • Transformations used to derive RDD: There are a number of rules to determine the number of partitions when an RDD is derived from another RDD.

As you can see, there are multiple aspects you’ll need to keep in mind when working with Apache Spark. In this blog, I want to highlight the importance of being completely aware of your business data, its keys and physical resources on Spark processing, and, most importantly, network, CPU, and memory.

Let’s look at some common pitfalls when working with Apache Spark partitioning.

Skewed Data and Shuffle Blocks

Processing with Apache Spark’s default partitioning might cause data to be skewed, which, in turn, can cause problems related to shuffles during aggregation operations or single executors not having sufficient memory.

Here, we see “key-a” has a larger amount of data in the partition, so tasks on Exec-5 will take much longer to complete than the other five tasks. Another important thing to remember is that Spark shuffle blocks can be no greater than 2 GB (internally because the ByteBuffer abstraction has a MAX_SIZE set to 2GB). For example, if you are running an operation such as aggregations, joins, or cache operations, a Spark shuffle will occur — and having a small number of partitions or data skews can cause a high shuffle block issue. Hence, if you started seeing an error related to a breach of MAX_SIZE limits due to shuffles, you know why it’s happening, as it may be tied to skewed data.

Partition Wisely

How do you avoid skewed data and shuffle blocks? Partitioning wisely. It’s critical to partition wisely in order to manage memory pressure as well as to ensure complete resource utilization on executors’ nodes. You must always know your data size, types, and how it’s distributed. A couple of best practices to remember are:

  • Understanding and selecting the right operators for actions like reduceByKey or aggregateByKey so that your driver is not put under pressure and the tasks are properly executed on executors.
  • If your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.
  • Also, if data is skewed, then repartitioning using an appropriate key that can spread the load evenly is also recommended.

How Do You Get the Right Number of Partitions?

Apache Spark can only run a single concurrent task for every partition of an RDD, up to the number of cores in your cluster (and probably 2-3x times that). Hence, as far as choosing a “good” number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism. The maximum size of a partition is ultimately limited by the available memory of an executor.

There are also cases in which it’s not possible to understand which proper repartitioning key should be used, even for data distribution. Hence, methods like salting can be used, which involves adding a new “fake” key and using it alongside the current key for better distribution of data. Here’s an example:

  • Add a random element to small RDD using a Cartesian product(1-N) to increase the number of entries and create new join key. 
  • Join RDDs on a new join key, which will now be distributed better due to random seeding.
  • Remove the random fake key from the join key to get the final result of the join.

In the example above, the fake key in the lookup dataset will be a Cartesian product (1-N), and for the main dataset, it will a random key (1-N) for the source dataset on each row, N being the level of distribution.

Original Link

Apache Spark: 3 Reasons Why You Should Not Use RDDs

Apache Spark: whenever we hear these two words, the first thing that comes to our mind is RDDs, (resilient distributed datasets). Now, it has been more than five years since Apache Spark came into existence and after its arrival, a lot of things changed in the big data industry. The major change was dethroning of Hadoop MapReduce. Spark literally replaced MapReduce because of the easy-to-use API in Spark, lesser operational cost due to efficient use of resources, compatibility with a lot of existing technologies like YARN/Mesos, fault tolerance, and security.

Due to these reasons, a lot of organizations have migrated their big data applications to Spark and the first thing they learn is how to use RDDs. This makes sense, as RDD is the building block of Spark and the whole idea of Spark is based on RDD. Also, it is the perfect replacement for MapReduce. So, whoever wants to learn Spark should know about RDDs.

But when it comes to building an enterprise-grade application based on Spark, RDD isn’t a good choice. Why? You will get to know that when you will read the reasons given below. If RDD is not a good choice, then what should we use? The obvious answer is DataFrames/Datasets.

Now, let’s come to reasons for not using RDDs:

1. Outdated

Yes! You read it right: RDDs are outdated. And the reason behind it is that as Spark became mature, it started adding features that were more desirable by industries like data warehousing, big data analytics, and data science.

In order to fulfill the needs of these industries, Spark has come up with a solution that can work like a silver bullet and solve the problem of being fit for all sorts of industries.

To do that, it introduced DataFrames and Datasets, distributed collections of data with the benefits of Spark SQL’s optimized execution engine. We’ll get to what Spark SQL’s optimized execution is later on, but for now, we know that Spark has come up with two new types of data structures that have more benefits than RDD.

2. Hard to Use

The next reason to not use RDD is the API it provides. Most of the operations like counting, grouping, etc. are pretty straightforward and easy-to-use APIs functions are built-in. But when it comes to operations like aggregation or finding averages, it becomes really hard to code using RDD.

For example, say we have a text file and we want to find out the average frequency of all the words in it.

First, let’s code it using RDD:

val linesRDD = sparkContext.textFile("file.txt")
val wordCountRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) val (word, (sum, n)) = { case (word, count) => (word, (count, 1)) } .reduce { case ((word1, (count1, n1)), (word2, (count2, n2))) => ("", (count1 + count2, n1 + n2)) } val average = sum.toDouble / n

Now, let’s try to solve the same problem using DataFrames:

val linesDF = sparkContext.textFile("file.txt").toDF("line")
val wordsDF = linesDF.explode("line", "word")((line: String) => line.split(" "))
val wordCountDF = wordsDF.groupBy("word").count()
val average = wordCountDF.agg(avg("count"))

The difference between the two solutions is clear. The first will definitely take you some time to understand what the developer is trying to do. But the second one is pretty straightforward and anyone who knows SQL will understand it in one go.

So, we saw that RDDs can sometimes be tough to use if the problem at hand is like the one above.

3. Slow Speed

Last, but not least, a reason to not use RDD is its performance, which can be a major issue for some applications. Since this is an important reason, we will take a closer look at it.

For example, say we have 100M numbers to be counted. Now, 100M doesn’t seem to be a big number when we talk about big data, but the important thing here to notice will be the difference in speed of the DataFrame/Dataset and RDD. Now, let’s see the example:

scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]

When I ran ds.count, it gave a result of, of course, 100,000,000, in about 0.2s (on a 4 core/8 GB machine). Also, the DAG created is as follows:

On the other hand, when I ran ds.rdd.count, which first converts the Dataset into RDD and then run a count on it, then it gave me the result in about 4s (on the same machine). Also, the DAG it creates is different:

Looking at the results and DAGs above, two questions will definitely arise in your mind:

  1. Why is ds.rdd.count is creating only one stage whereas ds.count created two stages?
  2. Why is ds.rdd.count slower than ds.count even though ds.rdd.count only has one stage to execute?

The answer to these questions is as follows:

  1. Both the counts are effectively two-step operations. The difference is that with ds.count, the final aggregation is performed by one of the executors, while ds.rdd.count aggregates the final result on the drive. Therefore, this step is not reflected in the DAG.
  2. ds.rdd.count has to initialize (and later garbage collect) 100 million row objects, which is a costly operation and accounts for the majority of the time difference between the two operations.

So, in conclusion, avoid RDDs wherever you can and use DataFrames/Datasets instead. But this doesn’t mean that we shouldn’t learn RDDs at all. After all, they are the building blocks of Spark and they can’t be ignored while learning Spark. But we should avoid them in our applications.

I hope you found this post interesting and now, you will be able to make others believe they should not use RDDs anymore.

This article was first published on the Knoldus blog.

Original Link

Some Lessons of Spark and Memory Issues on EMR

In the last few days, we went through several performance issues with Spark as data grew dramatically. The easiest go-around might be increasing the instance sizes. However, as scaling up is not a scalable strategy, we were looking for alternate ways to back to track, as one of our Spark/Scala-based pipelines started to crash.

Some Details About Our Process

We run a Scala (2.1)-based job on a Spark 2.2.0/EMR 5.9.0 cluster with 64 r3.xlarge nodes. The job analyzes several data sources, each of a few hundred GB (and growing), using the DataFrame API and output data to S3 using ORC format.

Analyzing the logs of the crashed cluster resulted in the following error:

WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx,
ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

How Did We Recover?

Setting the spark.yarn.executor.memoryOverhead to 2,500 (the maximum on the instance type we used r3.xlarge) did not make a major change.

spark-submit --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=2500 ...

We raised the bar by disabling the virtual and physical memory checks and increasing the virtual to physical memory ratio to 4.

[ { "classification": "spark", "properties": { "maximizeResourceAllocation": "true" } }, { "classification": "yarn-site", "properties": { " yarn.nodemanager.vmem-pmem-ratio": "4", "yarn.nodemanager.pmem-check-enabled": "false", "yarn.nodemanager.vmem-check-enabled": "false" } }

However, this made the magic till hitting the next limit (probably spark tasks were killed when they trying to abuse the physical memory) with the following error:

ExecutorLostFailure (executor exited caused by one of the running tasks) Reason: Container marked as failed: container_ on host:. Exit status: -100. Diagnostics: Container released on a *lost* node

This one was solved by increasing the number of DataFrame partitions (in this case, from 1,024 to 2,048). That reduced the needed memory per partition.

Right now, we run at full steam ahead. When we hit the next limit, it may worth an update.

As Spark heavily utilizes cluster RAM as an effective way to maximize speed, it is highly important to monitor it and verify that your cluster settings and partitioning strategy meet your growing data needs.

Original Link

Apache Spark Submit vs. Talend Spark Jobs: What’s the Difference?

In my previous post, Talend and Apache Spark: A Technical Primer, I walked you through how Talend Spark jobs equate to Spark Submit. In this post, I want to continue evaluating Talend Spark configurations with Apache Spark Submit. First, we are going to look at how you can map the options in the Apache Spark Configuration tab in the Talend Spark Job to what you can pass as arguments to Spark Submit and discuss their usage. Let’s get started.

Command Differences

When running an Apache Spark job (like one of the Apache Spark examples offered by default on the Hadoop cluster used to verify that Spark is working as expected) in your environment, you use the following commands:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client --executor-memory 5G --num-executors 10 /path/to/examples.jar 1000

The two commands highlighted above set the directory from where our Spark submit job will read the cluster configuration files. Then, we issue our Spark submit command that will run Spark on a YARN cluster in a client mode, using ten executors and 5G of memory for each to run our Spark example job.

Now, let’s take a look at how this same Spark example job runs in Talend. When we run a Spark example job (like the one above) in Talend, all the Spark configuration information is entered in the following tab within the Run tab:

This raises a few questions. How does information we enter in Talend map to what we enter on the terminal to run a Spark job? How do we know how many executors and memory we requested? What about troubleshooting? We will answer all of these questions in this article.

Before we proceed any further, I want to introduce some Spark submit options that will be used throughout this article. According to Apache Spark documentation, these are some of the most common options that you can pass to a Spark submit script:

  • --class: This is the main entry point for your Spark application.
  • --master: In this option, you specify if your Spark Master is a standalone Spark or if you are going to be using Spark on YARN.
  • --deploy-mode:As we mentioned in my previous blog this goes to the 2 different YARN modes you have available and details how your Spark driver will be deployed.
  • --conf:In this option, you will pass additional Spark configurations that you want your job to utilize like, as an example, spark.executor.userClassPathFirst=true.
  • application-jar:This refers to the path where you have placed your Spark compiled code that Apache Spark is going to execute.
  • application-arguments:In this option, you pass any arguments that are specific to your Spark code.

Let’s now turn to how the above options are used within a Talend Spark job. You will notice that in the Spark Configuration tab under the Run tab, the different options you can set are logically categorized into the following categories:

  1. Cluster version
  2. Configuration
  3. Authentication
  4. Tuning
  5. Spark history

Cluster Version

Let’s start with one of the first options that you have within the Talend job in the Cluster Version category. This is the Spark Mode option.

In this option, you can specify if your Spark Master in on YARN or if you are going to be using a Standalone Spark. This option maps to the --deploy-mode we described previously for the Spark submit options as well as the --master one. For example, if you select Spark Mode as a YARN Client in Talend, then this will equate to specifying in Spark submit --master yarn --deploy-mode client.  Now, if Standalone mode is selected in that drop-down box, Talend will prompt you to enter the information for your Spark Master URL as you would do on the Spark-submit end. This will map to passing the following argument in Spark submit that is --master spark://


In Talend, we have the Configuration category, which asks for the following information:

In the first set of checkboxes, Talend asks us to enter information about the resource manager, resource manager, scheduler address, job history address, and staging directory.

When using Spark submit, all this information is injected into our Spark job through the HADOOP_CONF_DIR.We can either set this as an environment variable before running our Spark submit script or by setting it as a permanent environment variable in /etc/environment or /etc/profile. As a note, all of these environment variables are also set in an environment shell script that is sourced by spark jobs when running through spark submit. The name of that file is and it is always located under the /etc/spark/conf directory on the Spark hosts. Here is an example of how this config file in the cluster looks:

In the next checkbox, it asks you if you want to define the Hadoop home directory as it is needed sometimes from Spark jobs. In Spark submit jobs, this information is also passed in the same manner, but the environment variable name is HADOOP_HOME. In a Talend Spark job, the checkboxes do what it is done by the file for the Spark submit script, which sources those values at runtime of your Spark job.

Finishing the configuration category in the Spark configuration within Talend, the last option you have defines the hostname or IP address of the Spark driver. This is a useful option when the system that the Spark Job runs from uses internal and external IPs or there are issues with the hostname resolution that could cause issues when the Spark Master and Executors try to connect back to the Spark Driver.

By default, if this option is not specified then it will try to use the local hostname and resolve its IP address. As we mentioned in the previous article, Talend uses YARN-client mode currently so the Spark Driver always runs on the system that the Spark Job is started from. Now, mapping this to the options provided by Spark submit, this would be specified by using the -conf one and then we would provide the following key/value pair This concludes mapping the options under the configuration sub-menu in Spark Configuration tab.


In the authentication category, we are given the option to select the authentication method that is used by our Hadoop cluster:

If we don’t check anything in the category, our job will assume that simple authentication is used by the cluster and it will try to connect to our Hadoop cluster using the username that we specify in there. In a Spark submit case, this information would be entered in the applications Spark configuration that we are submitting.

Now, if we go ahead and check the option to Use Kerberos authentication, it will prompt us to add the following information:

The first two fields are the service principal names that are used by the resource manager and job history service. If the option to use a keytab is not checked, then when the job runs, it will look for the ticket Kerberos cache on the system that it is running under as well as look under the cache that is specific for the user that started the job for valid Kerberos tickets to use.

If the keytab option is checked, then you will need to specify the keytab that will be used along with the principal name of the user that is issued for. This way, when the job starts, it will generate a Kerberos ticket based on that keytab for the principal that will be used by the job. In the case of Spark submit, in your application, you would pass in the Spark configuration that you set in the code that Kerberos is used for authentication. Before running through the Spark-submit, you would run the kinit Kerberos command to generate a ticket if you are not using a keytab, or if a keytab is used, then you can either run the kinit command with the flags needed to use a keytab for ticket generation or within your Spark application code you specify to log in from keytab.


Let’s move on to the category of tuning in Talend that provides the option of Set tuning properties, which is always unchecked by default. When Set tuning properties is checked, we are automatically greeted with the following options:

So, let’s go and see how all those options match to Spark submit.

The first option here is to Set application master tuning properties, which allows a user to set the amount of memory and number of cores that the YARN Application Master should utilize.

The purpose of the YARN Application Master instance is to do the negotiation of the resources from the Resource Manager and then communicate with the Node Managers to monitor the utilization of resources and execute containers. If this option is not set, it will allocate to the YARN Application Master 512m and one core by default. When mapping this to how we will pass it as an option to Spark submit, we would use the --conf option, and then pass the following two key/value pairs to it, 1.

We can also set a number of additional settings including the number of executors, the amount of memory on each executor, cores per executor, and also set the amount of overhead memory that can be allocated per executor in the next options.

The default values are 1g per executor memory, 1 core per executor, executor memory overhead by default will be 10 percent of the executor memory used with the minimum being 384m, and two executors will be requested. Mapping this back to how it would be passed in Spark submit as an option we have two different ways to execute. One of them is to use as we have in the example Spark submit command above --executor-memory 5G --num-executors 10 or we can pass them using the --conf option and then use the following key/value pairs:

spark.executor.instances=2, spark.executor.cores=1, spark.executor.memory=2, spark.yarn.executor.memoryOverhead=384m.

The next option we see available to use it asks us about the YARN resources allocation:

The options that we here are Auto, Fixed, and Dynamic, but what do those mean? Spark provides us with the ability to select how we want executors to be allocated.

If it is left on Auto we notice that the option to set the number of executors disappears that we mentioned above as it will use the default allocated by YARN which, as we mentioned, is two executors. Now, if it is set to Fixed, then we will see that we are given the option to set the number of executors we want our job to request for. The last option is Dynamic, which provides us with the ability to use the mechanism that Spark provides to dynamically adjust the executors allocated to our Spark job based on what it is needed at runtime. This means that our application, while running, would be able to ask for more executors as needed and release them back to YARN when not used. We will see that when this option is selected it provides us with the following configuration:

We can now select how many executors we will initially ask for from YARN, and then we can specify the minimum executors the job can have and the maximum amount depending on the workload of our job when executed by Spark. In order to pass the dynamic option in Spark submit, you will use the --conf option and then use the following key/value pairs spark.dynamicAllocation.enabled=true, spark.shuffle.service.enabled=true. Per Spark documentation, those two properties are required in to use this feature.

Moving on in the tuning category within the Spark Configuration tab in Talend, the next checkbox is Set Web UI port. When selected, it gives you the option to specify a port with the default being 4040. The purpose of this option is when your Spark Application is running, the Spark driver starts a Web UI that can be used to monitor your running Spark job and inspect the execution of the job. If this option is not selected, it will go ahead and start with the default port mentioned above and keep increasing the port number until an open one is found. This option would be usually used if you know that port 4040 is not available on the system that you are running your Spark job from, and you want to specify a certain port to be used instead of the Spark application trying to find an open port. As far as setting this option in Spark submit you will use the --conf option and then use the following key/value pair of spark.ui.port=4041.

Now the next option we have available to select is Broadcast Factory and we notice that for this one we are given a few different options.

So, what does Broadcast Factory do? The responsibility of Broadcast in Spark applications is for broadcasting variables across the executors in your cluster. The reasoning behind it is so that the variable can be quickly and efficiently get distributed instead of having a single node doing everything. As we noticed we are offered with three options to select from in this case. The first option is Auto, which, when selected, it will let defaults to be used. The second and third option allows you to select between using Torrent or HTTP as the broadcast factory. In Spark submit, you would pass this using the --conf option and then use the following key/value pair spark.broadcast.factory=org.apache.spark.broadcast.TorrentBroadcastFactory if you don’t want the default to be used, which is usually the Torrent one.

The last option that we are offered in the Tuning category is to customize the Spark serializer to be used:

The importance of Serialization in Spark as also described in the Spark documentation is to serialize the data among executors to increase performance in a distributed environment. By default, if this option is not selected by default, Talend will set the serialization to be used as the Kryo Serialization that is considered the most efficient one. When trying to use the same exact option in Spark submit, the --conf option will be used, and then specify the following key/value pair spark.serializer=org.apache.spark.serializer.KryoSerializer. If this option is not specified in Spark submit, the default Java Serializer will be used, and if the Spark SQL Thrift Server is used, then it will utilize by default the Kryo one.

Spark History

Now let’s move to the last category, Spark History. When we enable Spark logging, we notice that we are provided with the following options:

When event logging is enabled, you are given the option to specify a directory in HDFS where the job history files can be read by the Spark History server and specify the address of the History server. In Spark submit, in order to enable it, you will have to pass the following key/value pairs to the --conf option to enable and set it up, which are:

park.eventLog.enabled=true, spark.eventLog.dir=hdfs:// namenode_host: namenode_port/user/spark/applicationHistory, spark.yarn.historyServer.address=http:// spark_history_server: history_port

Additional Configuration

Now that we are done with the different categories in the Spark Configuration tab, we will notice that we have three more options left that we can use. The first one is Spark “scratch” directory. This option specifies the scratch directory that it is going to be used on the local disk of your system where the Spark job is started while your application is running. Using Spark submit, we would utilize the --conf and then pass spark.local.dir=/tmp. If we don’t specify anything, then by default, the /tmp directory will be used.

The next option is used for activating Spark checkpoints. This gives our Spark job the ability to recover from a specific point in time in case of failure. When activated, we will notice that it gives the opportunity to specify a directory either in the local filesystem or in HDFS to save as the job progresses. If we were to enable it in Spark submit, this will have to be done as also pointed out in the Spark documentation within our Spark code. An example is provided within the Spark documentation.

The last option is the Advanced Properties. In this option, we can add any Spark Properties that we want to pass in our application in a key/value pair. This is the same with what you will do when using Spark submit as in that case you would be passing them in the  --conf option.

As a note, when you take a closer look at your Hadoop cluster and one of the Spark Gateway nodes, you will notice that a lot of the default selections mentioned above are already specified within a specific file named spark-defaults.conf that will be used when you run Spark submit. This file is located under /etc/spark/conf. If you go ahead and open the file, you will see most of those properties mentioned here in there. You can still though override them as mentioned above, by passing them as options in your Spark submit. Here is an example:


Talend provides all the different options that you can use to configure your Spark application and makes it easy with the checkboxes and drop-down selections to specify the options that you want to utilize and which defaults are going to be used. I invite you to go through all those different settings that you can use in Talend Spark Jobs, and experience how easy it is to configure them and optimize them for your environment.

Original Link

Apache Spark Structured Streaming Integration With Apache NiFi 1.5: Scala Edition

Apache Spark 2.2.0 with Scala 2.11.8 with Java 1.8.0_112 on HDP 2.6.4 called from HDF 3.1 with Apache NiFi 1.5:

This is a follow up to this article.

We are using the same Apache NiFi flow to send messages to Apache Kafka. What is nice is that you could have the structured streaming version, non-structured version, and others listening to the same topic and same messages sent by Apache NiFi.

When we start, we have no data.

We quickly get a ton of data:

By default, a Kafka cluster is 3 nodes. A replication factor of 3 is good, then. I have one node. I had to change this. There were tons of warnings in the /usr/hdf/current/kafka-broker/logs directory.

The simplest Apache Spark client is one run in the shell:

/usr/hdp/current/spark2-client/bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

The code is a simple fork from this excellent highly recommended tutorial. If you are submitting this job and not running in a shell, add //. In the end, stop the streaming query with sq.awaitTermination.

val records = spark. readStream. format("kafka"). option("subscribe", "smartPlug2"). option("kafka.bootstrap.servers", "mykafkabroker:6667").load records.printSchema val result = records. select( $"key" cast "string", $"value" cast "string", $"topic", $"partition", $"offset") import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = result. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Append). queryName("scalastrstrclient"). start sq.status

Example run:

Spark context Web UI available at http://myipiscool:4045
Spark context available as 'sc' (master = local[*], app id = local-1519248053841).
Spark session available as 'spark'.
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information. scala> val records = spark. | readStream. | format("kafka"). | option("subscribe", "smartPlug2"). | option("kafka.bootstrap.servers", "server:6667").load
records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] scala> records.printSchema
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) scala> val result = records. | select( | $"key" cast "string", | $"value" cast "string", | $"topic", | $"partition", | $"offset")
result: org.apache.spark.sql.DataFrame = [key: string, value: string ... 3 more fields] scala> import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger} scala> import scala.concurrent.duration._
import scala.concurrent.duration._ scala> val sq = result. | writeStream. | format("console"). | option("truncate", false). | trigger(Trigger.ProcessingTime(10.seconds)). | outputMode(OutputMode.Append). | queryName("scalastrstrclient"). | start
sq: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3638a852 scala> sq.status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{ "message" : "Getting offsets from KafkaSource[Subscribe[smartPlug2]]", "isDataAvailable" : false, "isTriggerActive" : true
} scala> -------------------------------------------
Batch: 0
+---+-----+-----+---------+------+ -------------------------------------------
Batch: 1
|key |value |topic |partition|offset|
|02/21/2018 16:22:00|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.333,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:BF:B1:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"7777","fwId":"777","oemId":"FFF22CFF774A0B89F7624BFC6F50D5DE","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"777","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452287,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41,"longitude":-77,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.888908,"voltage":118.880856,"power":103.141828,"total":8.19,"time":"02/21/2018 16:22:00","ledon":true,"systemtime":"02/21/2018 16:22:00"}|smartPlug2|0 |14 |

Example JSON data:

|02/21/2018 16:23:58|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.337,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:88:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"8888","fwId":"6767","oemId":"6767","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"7676","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452404,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41.3241234,"longitude":-74.1234234,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.932932,"voltage":118.890282,"power":107.826982,"total":8.194,"time":"02/21/2018 16:23:58","ledon":true,"systemtime":"02/21/2018 16:23:58"}|smartPlug2|0 |24

You can learn more here.

Original Link

Apache Spark and Apache NiFi Integration (Part 2 of 2)

We’ll briefly start by going over our use case: ingesting energy data and running an Apache Spark job as part of the flow.

We will be using the new (in Apache NiFi 1.5/HDF 3.1) ExecuteSparkInteractive processor with the LivyController to accomplish that integration. As we mentioned in the first part of the article, it’s pretty easy to set this up.

Since this is a modern Apache NiFi project, we use version control on our code:

On a local machine, I am talking to an electricity sensor over Wi-Fi in a Python script. This code is processed, cleaned, and sent to a cloud-hosted Apache NiFi instance via S2S over HTTP.

In the cloud, we receive the pushed messages.

Once we open the Spark It Up processor group, we have a flow to process the data.

Flow Overview

  • QueryRecord: Determine how to route based on query on streaming data. Converts JSON to Apache AVRO.

Path for all files:

  • UpdateAttribute: Set a schema
  • MergeContent: Do an Apache AVRO merge on our data to make bigger files.
  • ConvertAvroToORC: Build an Apache ORC file from merged Apache AVRO file.
  • PutHDFS: Store our Apache ORC file in an HDFS directory on our HDP 2.6.4 cluster.

Path for large voltage:

  • ExecuteSparkInteractive: Call our PySpark job
  • PutHDFS: Store the results to HDFS.

We could take all the metadata attributes and send them somewhere or store them as a JSON file.

We tested our PySpark program in Apache Zeppelin and then copy it to our processor.

Our ExecuteSparkInteractive Processor:

In our QueryProcessor, we send messages with large voltages to the Apache Spark executor to run a PySpark job to do some more processing.

Once we have submitted a job via Apache Livy, we are now able to see the job during and after execution with detailed Apache Livy UI screens and Spark screens. In the Apache Livy UI screen below, we can see the PySpark code executed and its output.

Apache Livy UI:

Apache Spark Jobs UI: Jobs

Apache Spark Jobs UI: SQL

Apache Spark Jobs UI: Executors

Apache Zeppelin SQL search of the data:

Hive/Spark SQL table DDL generated automagically by Apache NiFi:

Below is the source code related to this article:

PySpark code:

shdf ="hdfs://yourhdp264server:8020/spark2-history") shdf.printSchema() shdf.createOrReplaceTempView("sparklogs") stuffdf = spark.sql("SELECT * FROM sparklogs") stuffdf.count()

This is a pretty simple PySpark application to read the JSON results of Spark2 History, print a schema inferred from it, and then do a simple SELECT and count. We could do Spark machine learning or other processing in there very easily. You can run Python 2.x or 3.x for this with PySpark. I am running this in Apache Spark 2.2.0 hosted on an HDP 2.6.4 cluster running Centos 7.  The fun part is that every time I run this Spark job, it produces more results for it to read. I should probably just read that log in Apache NiFi, but it was a fun little example. Clearly, you can run any kind of job in here, my next article will be around running Apache MXNet and Spark MLib jobs through Apache Livy and Apache NiFi.

For a quick side note, you have a lot of options for working with schemas now:

Schema for energy data:

{ "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }

Python source (updated to include 31 days):

from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime plug = SmartPlug("") row = { } emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items(): row["day%s" % k] = v emeterdaily = plug.get_emeter_daily(year=2018, month=1)
for k, v in emeterdaily.items(): row["day%s" % k] = v emeterdaily = plug.get_emeter_daily(year=2018, month=2)
for k, v in emeterdaily.items(): row["day%s" % k] = v hwinfo = plug.hw_info
for k, v in hwinfo.items(): row["%s" % k] = v sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items(): row["%s" % k] = v timezone = plug.timezone
for k, v in timezone.items(): row["%s" % k] = v emetermonthly = plug.get_emeter_monthly(year=2018)
for k, v in emetermonthly.items(): row["month%s" % k] = v realtime = plug.get_emeter_realtime()
for k, v in realtime.items(): row["%s" % k] = v row['alias'] = plug.alias
row['time'] = plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] = plug.led
row['systemtime'] ='%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)

Example output:

{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n | |-- Executor ID: string (nullable = true)\n | |-- Host: string (nullable = true)\n | |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n | |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n | |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n | |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/

Shell tip: Apache MXnet may have some warnings sent to STDERR. I don’t want these, so send them to /dev/null:

python3 -W ignore 2>/dev/null


  • PySpark
  • Python
  • Apache NiFi
  • Apache Spark
  • HDF 3.1
  • HDP 2.6.4
  • Apache Hive
  • Apache Avro
  • Apache ORC
  • Apache Ambari
  • Apache Zeppelin

Original Link

Scala vs. Python for Apache Spark

Apache Spark is a great choice for cluster computing and includes language APIs for Scala, Java, Python, and R. Apache Spark includes libraries for SQL, streaming, machine learning, and graph processing. This broad set of functionality leads many developers to start developing against Apache Spark for distributed applications.   

The first big decision you have is where to run it. For most, that’s a no-brainer – run it on Apache Hadoop YARN on your existing cluster. After that tough decision, the harder one for developers and enterprises is what language to develop in. Do you have to allow users to pick their own and support multiple languages? This will result in code and tool sprawl, and the R interface is not quite as rich. For most enterprises, seeing how verbose and uncommon the Java interface is, this leads them down the path to either Python or Scala. I am here to tear apart both options and rebuild them and see who is left standing.

Scala has a major advantage in that it is the language that the Apache Spark platform is written in. Scala on JVM is a very powerful language that is cleaner than Java and just as powerful. Using the JVM, your applications can scale to a massive size. For most applications, this is a big deal, but with Apache Spark already being distributed with Akka and YARN, it’s not necessary. You merely set a few parameters and your Apache Spark application will be distributed for you regardless of your language. So, this is not an advantage anymore.

Python has become a first-class citizen in the Spark World. It is also a very easy language to start with and many schools are teaching it to children. There is a wealth of example code, books, articles, libraries, documentation, and help available for this language.

PySpark is the default place to be in Spark. With Apache Zeppelin’s strong PySpark support, as well as Jupyter and IBM DSX using Python as a first-class language, you have many notebooks to use to develop code, test it, run queries, build visualizations and collaborate with others.   Python is becoming the lingua franca for Data Scientists, Data Engineers, and Streaming developers. Python also is well integrated with Apache NiFi.

Python has the advantage of a very rich set of machine learning, processing, NLP, and deep learning libraries available. You also don’t need to compile your code first and worry about complex JVM packaging. Using Anaconda or Pip is pretty standard and Apache Hadoop and Apache Spark clusters already have Python and their libraries installed for other purposes like Apache Ambari.

Some of the amazing libraries available for Python include NLTK, TensorFlow, Apache MXNet, TextBlob, SpaCY, and Numpy.

Python Pros

  • PySpark is listed in all the examples and is no longer an afterthought.   

  • Most libraries come out with Python APIs first.

  • Python is a mature language.

  • Python usage continues to grow.

  • Deep learning libraries are including Python.

  • Included in all notebooks.

  • Ease of use.

Python Cons

  • Sometimes Python 2 and Sometimes Python 3

  • Not as fast as Scala (though Cython fixes it)

  • Some of the libraries are tricky to build

Scala Pros

  • JVM

  • Strong IDEs and unit testing

  • Great serialization formats

  • Reuse Java libraries

  • Fast

  • AKKA

  • Spark Shell

Scala Cons

  • Not as wide spread use or knowledge base

  • It’s a little odd for Java people to move to

  • Has to compiled for Apache Spark jobs


Original Link

Schema and Spark Fun With HDF 3.1

I want to easily integrate Apache Spark jobs with my Apache NiFi flows. Fortunately, with the release of HDF 3.1, I can do that via Apache NiFi’s ExecuteSparkInteractive processor.

For the first step, let me set up a Centos 7 cluster with HDF 3.1. Follow the well-written guide here.

With the magic of time-lapse photography, instantly, we have a new cluster of goodness:

It is important to note the new NiFi Registry for doing version control and more. We also get the new Kafka 1.0, updated SAM, and the ever-important updated Schema Registry.

The star of the show today is Apache NiFi 1.5 here.

My first step is to add a Controller Service (LivySessionController).

Then, we add the Apache Livy Server. You can find this in your Ambari UI. It is by default port 8999. For my session, I am doing Python, so I picked pyspark. You can also pick pyspark3 for Python 3 code, spark for Scala, and sparkr for R.

To execute my Python job, you can pass the code in from a previous processor to the ExecuteSparkInteractive processor or put the code inline. I put the code inline.

Two new features of Schema Registry that I have to mention are the version comparison:

You click the COMPARE VERSIONS link and now, you have a nice comparison UI.

And the amazing new Swagger documentation for interactive documentation and testing of the Schema Registry APIs.

Not only do you get all the parameters for input and output, the full URL, and a curl example — you get to run the code live on your server.

I will be adding an article on how to use Apache NiFi to grab schemas from data using InferAvroSchema and publish these new schemas to the Schema Registry via REST API automatically.

Part two of this article will focus on the details of using Apache Livy, Apache NiFi, and Apache Spark with the new processor to call jobs.

Original Link

Smart Resource Utilization With Spark Dynamic Allocation

When a Spark application is submitted to a cluster, the cluster allocates the resources requested by the application for the entire duration of the application lifecycle. These resources cannot be shared with other application as they are dedicated to that application.

This paradigm is suitable for batch processing applications. The application is submitted, handling huge amounts of data, and when it is done (the main program exits and the driver JVM is terminated), the cluster reclaims the resources back and those resources are available for other applications to utilize. Usually, the batch application does need the resources for most of its lifecycle. However, what if this application is not a batch job? What if it is a server that serves users upon request? Or maybe a streaming application that handles data in a variable load?

On such applications, the demand for high resources is only needed during peak time, but during idle time, it is a waste of resources to allocate high capacity that is not used. Such an application use case can be:

  • A streaming application that handles varying loads of data using the new Spark Structured Streaming.

  • A REST server that serves SQL queries on data using Spark SQL.

  • A Notebook IDE such as Apache Zeppelin that allows interactive analytics using Spark over a cluster of nodes.

All those applications need Spark to run perpetually but also need high capacity resources only part of the time. For this, Spark comes to our aid with Spark Dynamic Allocation. The main idea is this: the Spark application will request minimal (or even no) resources during idle time, but when there are tasks to be performed, it will request more resources to complete those tasks. When the load is done, Spark will release those resources back to the cluster. In this way, we can utilize our cluster’s resources in an efficient way.

How It Works

The minimal unit of resource that a Spark application can request and dismiss is an Executor. This is a single JVM that can handle one or many concurrent tasks according to its configuration. We can set the number of cores per executor in the configuration key spark.executor.cores or in spark-submit’s parameter --executor-cores. If, for instance, it is set to 2, this Executor can handle up to two concurrent tasks. The RAM of each executor can also be set using the spark.executor.memory key or the --executor-memory parameter; for instance, 2GB per executor.

From the dynamic allocation point of view, in this case, the application can request resources of 2 cores and 2GB RAM units each time. The application will first request 1 unit of such resources but if the loads increase, the following requests will get exponentially bigger by the power of 2: 2, 4, 8, etc.

The application measures its load by the number of tasks waiting to be performed. If the queue of waiting tasks contains more tasks than the number of cores the application already has, it will try to request more cores. These requests are granted up to the cluster’s limit or to a limit that can be configured in the application’s context.

When the number of waiting tasks becomes zero, the application will dismiss the idle executors until it reaches the minimum number of executors it is configured to use. By default, this number is zero.


In order to support dynamic allocation, the cluster must be configured to have an external shuffle service. This is needed in order to retain shuffle information when the Executor is removed. All cluster managers used by Spark support external shufflers. Here, I will talk about the Spark standalone cluster manager. For more details on configuring Mesos or Yarn, see Spark’s dynamic allocation configuration. In order to configure an external shuffler on Spark standalone, start the worker with the key spark.shuffle.service.enabled set to true .

In addition, the Spark application must be started with the key spark.dynamicAllocation.enabled set to true. This can be done, for instance, through parameters to the spark-submit program, as follows:

spark-submit --master spark://<spark_master>:7077 --class com.haimcohen.spark.SparkJavaStreamTest --executor-cores 1 --executor-memory 1G --conf spark.dynamicAllocation.enabled=true spark-app.jar

Please note: Since a single Executor is the smallest unit an application can request and remove, it is wise to set a small amount of resources per executor. In the case above, each executor will utilize 1 core and 1GB RAM. This allows the application to increment its resources by 1 core and 1GB at a time.

Additional Configuration

Limit Resources

Each application can set the minimal and maximal resources the cluster should allocate to. This is done by setting the minimum and the maximum number of executors. The configuration keys to control those numbers are spark.dynamicAllocation.minExecutors (default value: zero) and spark.dynamicAllocation.maxExecutors (default value: infinity).  

Resource Removal Policy

When no tasks are to be executed, the executor becomes idle. By default, 60 seconds of idle executor will be removed. This value can be controlled through the key spark.dynamicAllocation.executorIdleTimeout


When an application caches a dataset or RDD in memory or disk, this cache memory is lost when the executor is removed. As a default policy, dynamic allocation will not remove Executors that cache data. I found it a bit harsh for my applications, as I needed the applications to cache the dataset for a limited amount of time. Luckily, it is possible to change that policy by setting the number of seconds an executor can be removed even if it cached data spark.dynamicAllocation.cachedExecutorIdleTimeout. In future versions of Spark, it is planned to manage caches in an external service, much in the same way shuffles management is done.


In order to test dynamic allocation, I started two long-running applications with dynamic allocation enabled. Each application configured to use 1 core and 1GB RAM per executor. The applications I used for testing are Apache Zeppelin and Spark Structured Streaming. I used a cluster of a single node with 8 cores and 15GB RAM.

When the applications were in an idle state, the Spark Standalone UI (port 8080 on the master node) looked like this:

Idle applications

You can see that the worker has zero used cores and memory. When I ran some code in Zeppelin and added some files to the streaming application, I could see both applications running:

Running Applications

You can see each application uses 2 cores (1 core per executor) and the total cores used in our worker is 4. The memory is also totaled to 4GB used. 

When the applications went back to idle, you could see that the executors were removed and no cores and memory were used:

Back to idle


On a long-running Spark application with a substantial amount of idle time, it is more efficient to use dynamic allocation and cluster resources for other needs during these idle periods. This still allows the long-running application to utilize high resources on peak time. Configuring your applications wisely will provide a good balance between smart allocation and performance.

Original Link

Reading Data From Oracle Database With Apache Spark

In this article, I will connect Apache Spark to Oracle DB, read the data directly, and write it in a DataFrame.

Following the rapid increase in the amount of data we produce in daily life, big data technology has entered our lives very quickly. Instead of traditional solutions, we are now using tools with the capacity to solve our business quickly and efficiently. The use of Apache Spark is a common technology that can fulfill our needs.

Apache Spark is based on a framework that can process data very quickly and distributedly. In this article, I will not describe Apache Spark technology in detail, so those who are interested in the details should check out the Apache Spark documentation.

The preferred method to process the data we store in our RDBMS databases with Apache Spark is to migrate the data to Hadoop first (HDFS), distributively read the data we have stored in Hadoop (HDFS), and process it with Apache Spark. As those with Hadoop ecosystem experience know, we are exchanging data between the Hadoop ecosystem and other systems (RDBMS-NoSQL) with tools that integrate into the Hadoop ecosystem with Sqoop. Sqoop is a data transfer tool that is easy to use, common, and efficient.

There is some cost involved in moving the data to be processed to the Hadoop environment before the RDBMS, and then importing the data to be processed with Apache Spark. The fact that we do not use the data that we have moved to HDFS will cause us to lose a certain amount of space in HDFS, and it will also increase the processing time. Instead of this method, there is a way with Apache Spark that reads and uses the RDBMS directly without having to go to the HDFS and store it there — especially afterward.

Let’s see how to do this.

The technologies and versions I used are as follows:

  • Hadoop: Hadoop 2.7.1

  • Apache Spark: Apache Spark 2.1.0

  • Oracle database: Oracle 11g R2, Enterprise Edition

  • Linux: SUSE Linux

To do this, we need to have the ojdbc6.jar file in our system. You can use this link to download it.

We will create tables in the Oracle database that we will read from Oracle and insert sample data in them.

(7369, 'SMITH', 'CLERK', 7902, 800, 50, 20); INSERT INTO EMP VALUES
(7499, 'ALLEN', 'SALESMAN', 7698, 1600, 300, 30); INSERT INTO EMP VALUES
(7521, 'WARD', 'SALESMAN', 7698, 1250, 500, 30); INSERT INTO EMP VALUES
(7566, 'JONES', 'MANAGER', 7839, 2975, NULL, 20); INSERT INTO EMP VALUES
(7654, 'MARTIN', 'SALESMAN', 7698, 1250, 1400, 30); INSERT INTO EMP VALUES
(7698, 'BLAKE', 'MANAGER', 7839, 2850, NULL, 30); INSERT INTO EMP VALUES
(7782, 'CLARK', 'MANAGER', 7839, 2450, NULL, 10); INSERT INTO EMP VALUES
(7788, 'SCOTT', 'ANALYST', 7566, 3000, NULL, 20); INSERT INTO EMP VALUES
(7844, 'TURNER', 'SALESMAN', 7698, 1500, 0, 30); INSERT INTO EMP VALUES
(7876, 'ADAMS', 'CLERK', 7788, 1100, NULL, 20); INSERT INTO EMP VALUES
(7900, 'JAMES', 'CLERK', 7698, 950, NULL, 30); INSERT INTO EMP VALUES
(7902, 'FORD', 'ANALYST', 7566, 3000, NULL, 20); INSERT INTO EMP VALUES
(7934, 'MILLER', 'CLERK', 7782, 1300, NULL, 10); CREATE TABLE DEPT

Now we are starting Apache Spark from the linux terminal with Pyspark interface (Python Interface).

/spark-2.1.0-bin-hadoop2.7/bin/pyspark --jars "/home/jars/ojdbc6.jar" --master yarn-client --num-executors 10 --driver-memory 16g --executor-memory 8g

We started Apache Spark. Now let’s write the Python code to read the data from the database and run it.

empDF = \ .format("jdbc") \ .option("url", "jdbc:oracle:thin:username/password@//hostname:portnumber/SID") \ .option("dbtable", "hr.emp") \ .option("user", "db_user_name") \ .option("password", "password") \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .load()

Let’s take a look at the contents of this dataframe as we write to the empDF dataframe.


Yes, I connected directly to the Oracle database with Apache Spark. Likewise, it is possible to get a query result in the same way.

query = "(select empno,ename,dname from emp, dept where emp.deptno = dept.deptno) emp" empDF = \ .format("jdbc") \ .option("url", "jdbc:oracle:thin:username/password@//hostname:portnumber/SID") \ .option("dbtable", query) \ .option("user", "db_user_name") \ .option("password", "password") \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .load() empDF.printSchema()

It is very easy and practical to use, as you can see from the examples made above.

With this method, it is possible to load large tables directly and in parallel, but I will do the performance evaluation in another article.

Original Link

12 Frameworks Java, Web, and Mobile Programmers Can Learn in 2018

Hello guys, the New Year is already started and everybody is ready with their goals, but if you haven’t decided what to learn in 2018, then you have come to the right place. In today’s post, I’ll share some of the best frameworks you can learn to improve your knowledge in mobile and web development, as well as on Big Data technologies. 

In today’s world, good knowledge of various frameworks is very important. They allow you to quickly develop both prototype and real project. If you are working in startup space then you are expected to whip something cool in no time and that’s where knowledge of framework excel. 

It also helps you to get a better job and take your career to next level, if you are stuck somewhere working on boring jobs, like just starting and stopping the server, setting up some cron jobs, and replying to the same old emails maintaining legacy applications. 

In this article, I have shared 12 useful frameworks related to Java development, mobile app development, web development, and Big Data. If you have another good framework which you think Java and web developers should learn in 2018, then feel free to share it in the comments. 

1) Angular

This is another JavaScript framework, which is on my list of things to learn in 2018. It provides a completely client-side solution. You can use AngularJS to create dynamic web pages on the client side. It provides declarative templates, dependency injection, end-to-end tooling, and integrated best practices to solve common development challenges on the client side.

Learn Angular JS Framework in 2018

Since it’s a JavaScript library, You can include this on your HTML page using the <script> tag. It extends HTML attributes with Directives and binds data to HTML using Expressions.

Since Google is behind Angular, you can rest assured in terms of performance and regular updates. I strongly believe AngularJS is here for the long run, hence, investing time in it is completely justified. If you decide to learn Angular in 2018, then Angular 5  – The Complete Guide from Udemy is a good starting point.

2) Node.js

There is no doubt that JavaScript is the #1 programming language and Node.js has a big part to play in that. Traditionally, JavaScript is used as a client-side scripting language, where it is used with HTML to provide dynamic behavior on the client side. It runs on the web browser, but Node.js allows you to run JavaScript on the server side.

Tye Node.js is an open-source, cross-platform JavaScript run-time environment for executing JavaScript code server-side. You can use Node.js to create dynamic web pages on the server side before you send them to the client.

Image title

This means you can develop a front to back, client-server application in JavaScript. I purchased The Complete Node.js Developer Course last month on Udemy’s $10 sale and I look forward to learning it in 2018.

3) Spring Boot

I have been using the Spring framework for many years, so when I was first introduced to Spring Boot, I was totally surprised by the relative lack of configuration. Writing a Spring-based Java application using Spring Boot was as simple as writing a core Java application using the main() method.

Even though I have tried Spring Boot, I have yet to learn many things, and that’s why it’s on my list of things to learn in 2018.

Learn Spring Boot 2.0Framework in 2018

I have also bought the Spring Boot Masterclass, one of the best online courses to learn Spring Boot, from Udemy for just $10 last month, and I look forward to using that in 2018.

4) React

React is another JavaScript library or framework for building user interfaces. It’s like AngularJS but maintained by Facebook, Instagram, and a community of individual developers and corporations. It allows web developers to create large web-applications which can change over time without reloading the page.

The web development world is divided between Angular and React and it’s up to you what you choose. Most of the time, it’s dictated by circumstances; for example, if you are working in a React-based project, then obviously, you need to learn React.

Image title

If you decide to learn React in 2018, then the React.js: Getting Started course from Pluarlsight is a good starting point.

5) Bootstrap

This is another popular open-source, front-end web framework for designing websites and web applications. Initially brought to us by Twitter, Bootstrap provides HTML- and CSS-based design templates for typography, forms, buttons, navigation, and other interface components, as well as optional JavaScript extensions.

Learn BootStrap 4 Framework in 2018

Bootstrap supports responsive web design, which means the layout of web pages adjusts dynamically depending upon the browser’s screen size. In the world of mobile, BootStrap is leading the way with its mobile-first design philosophy, emphasizing on a responsive design by default.

If you are a web developer and don’t know Bootstrap, 2018 is the right time to get started with it. BootStrap 4 From Scratch is a good starting point for your Bootstrap journey in 2018.

6) jQuery

This is another JavaScript framework which rules the world. jQuery has been my favorite for a long time and I advise every developer to learn jQuery. It makes client-side scripting really easy.

You can do animation, send HTTP request, reload pages, and perform client-side validation by writing just a couple of lines of code.

Learn jQuery Framework in 2018

If you decide to learn jQuery in 2018, then I suggest you take a look at this jQuery master class, a free online course from Udemy for learning jQuery.

7) Spring Security 5

There is no substitute for security, and in 2018, it will be even more important. Since Spring Security has become synonymous with web security in the Java world, it makes perfect sense to update yourself with the latest release of Spring Security in 2018.

Image title

The new version 5.0 of Spring Security includes many bug fixes and a complete new OAuth 2.0 module. Even if you don’t know Spring Security, you should consider learning it in 2018, and there is no better way than joining Eugen Paraschiv’s Learn Spring Security MasterClass.

8) Apache Hadoop

Big Data and automation are the focus of many companies in 2018, and that’s why it becomes important for programmers to learn Big Data technologies like Hadoop and Spark. Apache Hadoop is a framework which allows distributed processing of large data sets across clusters of computers using simple programming models.

Learn Apache Hadoop in 2018

It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. It’s based upon the popular Map Reduce pattern and is key for developing a reliable, scalable, and distributed software computing application.

I have already enrolled in The Ultimate Hands-On Hadoop last month, and if you decide to learn Hadoop in 2018, you can join, too, on Udemy.

9) Apache Spark

This is another Big Data framework which is gaining popularity. Apache Spark is a fast, in-memory data processing engine with elegant and expressive development APIs to allow data workers to efficiently execute streaming, machine learning, or SQL workloads that require fast iterative access to datasets.

Learn Apache Spark in 2018

You can use Spark for in-memory computing for ETL, machine learning, and data science workloads to Hadoop. I have already shortlisted the Apache Spark Fundamentals course from PluralSight to go through in 2018. If you are in the same boat, you can check out that course to get some inspiration.

10) Cordova

Apache Cordova is another mobile application development framework originally created by Nitobi. Adobe Systems purchased Nitobi in 2011, rebranded it as PhoneGap, and later released an open source version of the software called Apache Cordova.

Image title

It allows you to use standard web technologies – HTML5, CSS3, and JavaScript for cross-platform development, and one is of the hot technologies to learn in 2018. If you’d like to learn Cordova in 2018, then please check out Build iOS & Android Apps with Angular and Cordova, one of the exciting courses for learning Cordova.

11) Firebase

Firebase is Google’s mobile platform that helps you quickly develop high-quality mobile apps and grow your business. You can choose Firebase as a backend for your Android or iOS application.

Image title

If you are looking to move into the lucrative business of mobile application development in 2018, then learning Firebase is a very good idea and Advanced iOS and Firebase: Rideshare is a good place to start.

12) Xamarin

Xamarin is a way to make mobile apps quickly for all platforms with a single, shared C# code base, build a custom native user interface for each platform, or use Xamarin.Forms to write a single shared user interface across platforms.

It’s owned by Microsoft and quickly becoming popular for creating mobile apps for C, C++, and C# developers.

Image title

If you already know one of the C languages and you’re looking for a career in mobile app development, then I strongly suggest you learn Xamarin in 2018, and The Complete Xamarin Developer Course: iOS And Android! is a good course to start with.

That’s all about what to learn in 2018. These frameworks are in great demand, particularly Spring, Node.js, and AngularJS. Learning these frameworks will not only improve your chances of getting a job, but also open many doors of opportunity.

Even if you are settled down in your job, keeping yourself up-to-date with the latest and greatest technologies is essential for your career growth.

So, I suggest you pick a couple of these frameworks and learn them in 2018. If you are Java developer, then Apache Spark is a good choice, but if you are interested in a programming language rather than a framework or library, then Kotlin looks good in 2018.

Original Link

How Does Spark Use MapReduce?

In this article, we will talk about an interesting scenario: does Spark use MapReduce or not? The answer to the question is yes — but only the idea, not the exact implementation. Let’s talk about an example. To read a text file from Spark, all we do is:


But do you know how does it actually works? Try to ctrl + click on this method text file. You will find this code:

/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path)

As you can see, it is calling the Hadoop file method of the Hadoop API with four parameters: file path, input format, LongWritable, and text input format.

So, it doesn’t matter that you are reading a text file from the local S3; HDFS will always use the Hadoop API to read it.

hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)

Can you understand what this code is doing? The first parameter is the path of the file, the second parameter is the input format (which should be used to read this file), and the third and fourth parameters are similar to the record reader (which is the offset of the line itself).

Now, you might be thinking, “Why don’t we get this offset back when we are reading from a file?” The reason for this is below:

.map(pair => pair._2.toString)

This is mapping over all the key-value pairs but only collecting the values.

Original Link

Qubole Offering Apache Spark on AWS Lambda

Qubole is announcing the availability of a working implementation of Apache Spark on AWS Lambda. This prototype has been able to show a successful scan of 1 TB of data and sort 100 GB of data from AWS Simple Storage Service (S3). This article dives into the technical details of how we built this prototype and the code changes required on top of Apache Spark 2.1.0.


AWS Lambda is a serverless compute service. It allows you to scale when needed while paying for only the compute used and avoiding the need to provision servers. This allows applications to be highly elastic in terms of the compute demands and still run efficiently.

Apache Spark is a fast, general-purpose big data processing engine. It is growing in popularity due to the developer-friendly APIs, ability to run many different kinds of data processing tasks, and superior performance. The ability to run Apache Spark applications on AWS Lambda would, in theory, give all the advantages of Spark while allowing the Spark application to be a lot more elastic in its resource usage. We started the Spark On Lambda project to explore the viability of this idea.

Qubole implementation of Spark on AWS Lambda allows:

  • Elasticity: We can now run bursty workloads that require thousands of Spark executors using AWS Lambda invocations — without waiting for machines to spin up.

  • Simplicity: At Qubole, we run some of the largest auto-scaling Spark clusters in the cloud. The self-managing clusters have allowed democratization and wide-spread adoption of big data technologies. However, a Lambda-based implementation further simplifies configuration and management for administrators.

  • Serverless: The popular ways of running Spark are using the Standalone mode or on Apache Hadoop’s YARN. These methods presuppose the existence of a cluster and elasticity of these applications is limited by the underlying cluster’s ability to scale up and down. With Spark on Lambda, the concept of a cluster goes away entirely.

  • Transparency: A Spark application invokes a number of AWS Lambda functions – each with a well defined cost. This allows us to calculate, exactly, the cost of running each Spark workload.

Use Cases

Some of the common use cases that can be tackled using this combination include:

  • Data pre-processing and preparation: Transformation of logs like clickstream and access logs for ETL or for data mining can be done using AWS Lambda.

  • Interactive data analysis: Ad hoc interactive queries are a good fit for Spark on Lambda as we can provision a large amount of compute power quickly.

  • Stream processing: Processing a discrete flow of events is also a candidate use case for Spark on Lambda.

Key Challenges

While AWS Lambda comes with the promise of fast burst provisioning of nearly infinite cloud computing resources — it has some limitations that make it a challenge to run Spark. Our implementation had to overcome these key challenges:

  1. Inability to communicate directly: Spark using DAG execution framework spawns jobs with multiple stages. For inter-stage communication, Spark requires data transfer across executors. AWS Lambda does not allow communication between two Lambda functions. This poses a challenge for running executors in this environment.

  2. Extremely limited runtime resources:AWS Lambda invocations are currently limited to a maximum execution duration of 5 minutes, 1536 MB memory, and 512 MB disk space. Spark loves memory, can have a large disk footprint and can spawn long-running tasks. This makes Lambda a difficult environment to run Spark on.

These limitations force non-trivial changes to Spark to make it run successfully on Lambda.

In the following sections, we describe the changes to make Spark work on serverless runtimes like AWS Lambda and the applications that were run using Spark on Lambda.


The implementation is based on two key architectural changes to Spark:

  1. Spark executors are run from within an AWS Lambda invocation.

  2. Shuffle operations use external storage to avoid limits on the size of the local disk and to avoid inter-communication between Lambda invocations.


Qubole Running Apache Spark on AWS Lambda

Qubole Running Apache Spark on AWS Lambda

The Spark driver JVM runs on an EC2 instance in a VPC. The security group on the EC2 instance running Spark driver allows incoming connections from the executors running on Lambda. The Lambda functions are run as a part of the same VPC.

We have skimmed Spark, Hadoop, and Hive distribution (JARs) to create a smaller package that can be deployed within 512MB disk limit of the Lambda instance.

Spark Executors as Lambda Functions

Generally, Spark Executors are launched on machines with a lot of disk space where Spark libraries are pre-installed. However, AWS Lambda functions can only be launched with a maximum deployment package size of 50 MB (ZIP/JAR file). In order to be able to run Spark Executors via Lambda, we:

  1. Launch AWS Lambda using a barebones Python Lambda function.

  2. Immediately after launch, the Python code bootstraps Lambda runtime by downloading Spark libraries from a zipped S3 package, extracts the archive under /tmp (only writable location in Lambda), and starts the Spark Executor by executing the Java command line passed as part of the Lambda request.

  3. This Executor then joins the Spark application by sending heartbeats back to the Spark Driver

Starting executors from scratch with the above strategy can be fairly slow. However, AWS Lambda allows subsequent instances of the executor to start up much faster than the first call. In the cold start case, we observed an executor startup time to be around two minutes. Comparatively, in the warm case (where Lambda already has a provisioned container), we noticed a startup time of around 4 seconds. This is in contrast to provisioning EC2 instances required for capacity expansion that takes around one to two minutes to become operational.

New Spark Scheduler

Spark has a pluggable scheduler backend known as  CoarseGrainedSchedulerBackend that works with different cluster managers like Apache Hadoop YARN, Mesos, Kubernetes, Standalone, etc. for requesting resources. Spark driver internally creates the respective scheduler backend during the launch of the Spark application which deals with the registration of the application, stopping the application, requesting more executors or killing of existing executors.

For example, in the case of YARN as the cluster manager, YarnSchedulerBackend submits a YARN application along with the necessary information to launch an ApplicationMaster to the YARN’s ResourceManager  using the YARN client. Once the application is accepted by ResourceManager, it will launch the ApplicationMasterwhich helps in negotiating resources with YARN throughout the lifecycle of the Spark application.

Similarly, we implemented a new backend for AWS Lambda. Whenever the Spark driver needs more resources, the LambdaSchedulerBackend makes an API call to AWS Lambda to get a Lambda invocation. Once the invocation is available, the Executor is started as described previously. Once the executor’s five-minute runtime is exhausted, Spark’s auto-scaling component decides to ask for new executors and new API calls are made to AWS Lambda.

An additional change to Spark scheduler also stops the scheduling of tasks to an executor once it is close to the expiry of its execution duration time. To reduce the possibility of failures of running tasks, the scheduler stops assigning tasks to such executors after a configurable interval (four minutes in our case).

During our experiments, we also noticed that AWS Lambda starts throttling the requests after a certain rate is exceeded. To account for these, we modified the Scheduler backend to make requests at a constant linear rate instead of making a large number of requests at once.

State Offload

The changes to Spark shuffle infrastructure is an important change we had to make to run Spark on Lambda. The stateless architecture of Lambda, its runtime limits and the inability to communicate between two Lambda functions meant that we need an external store to manage state. With Lambda being an AWS service, S3 became an automatic choice to store the shuffle data. The mapper tasks write the data directly to S3 using a directory layout scheme, which allows any executor (tasks in downstream stage) to read the shuffle data without the need for an external shuffle service. The writes to S3 are performed using stream mechanism in which large files/streams are uploaded in the form of blocks.

Although, we initially ran all the experiments using S3 as shuffle store, internally we use Hadoop file system, which makes the shuffle store pluggable. This makes it possible to replace S3 as shuffle store with HDFS or Amazon Elastic File System (EFS) easily.

In this architecture, since the shuffle data of tasks is persisted to S3, we also made scheduler changes to avoid the need to resubmit a stage for handling executor (Lambda) failures.

Performance Measurements

The big question is how this implementation performs on everyday use cases that we set out to solve. Results on two simple and realistic use cases are reported next.

Scanning 1 TB of Data

A line count operation on a dataset involving the read of 1 TB data using 1000 Lambda executors took only 47 seconds. Given the AWS cost of $0.000002501 per 100ms compute time, the cost turns out to be $1.18.

The time spent is in sharp contrast to the amount of time required to bring up a cluster and perform the same operation. The average time to bring up a Spark cluster in the cloud requires two to three minutes. For an already-running cluster with sufficient capacity, the memory requirements and concurrency of this operation would have required at least 50 r3.xlarge instances, an additional overhead to maintain, and optimizing the lifecycle of these instances.

Sorting 100 GB of Data

Using Spark on Lambda, we were able to sort 100 GB data in little less than ten minutes (exactly 579.7s) with 400 concurrent executors running at any time with total 800 Lambda functions.

Technical Details

  1. We found 128 MB to be the right split size. Split size of 64 MB results in lots of S3 calls that introduce latencies and increases the end-to-end runtime of the application. On the other hand, processing of 256 MB split size hits the memory limit of AWS Lambda execution.

  2. AWS Lambda can execute thousands of Lambda functions concurrently. For this application, we had to limit it to 400 concurrent Lambda functions as the concurrent write of shuffle data to a bucket led to throttling from S3 service — though this can be overcome by making changes to the shuffle write scheme and partitioning writes based on a prefix generated by application

The setup shall also be extendable to sort larger datasets as well.

Cost Estimate

Overall it took 1000 Lambda functions running for five minutes with 1.5 GB memory. Lambda function with this specification costs $0.000002501 per 100ms.

Total cost = $0.000002501 * 5 * 60 * 10 * 1000 = $7.5

This is definitely expensive but this is due to occasional S3 throttling, expiring Lambda execution duration causing a task to fail and to be retried. It both costs as well as adds up to the time. We have listed few changes in future work section below that shall help in reducing the runtime of this benchmark and result in lower Lambda costs. This example also reiterates the fact that applications that need large amounts of data to be preserved as state and can enjoy the benefits of Lambda such as simplicity, elasticity, and transparency, but might incur additional cost overhead.

GitHub Repository

The code for Apache Spark (based on version 2.1.0) to work with AWS Lambda is available here.

Future Work

The above work has helped in classifying applications that can be a great fit for Spark on Lambda. We have also identified additional areas where improvements can be made to increase the scale and subset of suitable applications:

  • Scheduler: AWS Lambda limits maximum execution duration to 5 minutes. One can go beyond the scheduling changes suggested above and intelligently schedule tasks by looking at the maximum runtime of a task in a stage and not schedule it on an executor on Lambda that doesn’t have sufficient remaining execution duration.

  • Shuffle: With AWS S3, occasional eventual consistency issues can arise as a result of rename operation on shuffle data. These renames on S3 can either be eliminated or handled in a manner to avoid an eventual consistent operation.

  • Shuffle: With more than 400 concurrent Lambda executions and write to an S3 bucket, the application can experience throttling. This can be scaled for better performance by partitioning the shuffle data written to S3 with a unique prefix to perform load balancing of keys.

  • Shuffle: The latency while fetching multiple shuffle blocks from S3 can be masked by parallelizing these requests in the new block fetcher added for Spark on Lambda functionality.

  • The executor bring up and shuffle data write times can be further optimized if AWS Lambda has support for other scalable file storage solutions such as AWS Elastic File System (EFS)

Original Link

Optimizing Spark Job Performance With Apache Ignite (Part 1)

Portions of this article were taken from my book, High-Performance In-Memory Computing With Apache Ignite. If this post got you interested, check out the rest of the book for more helpful information.

Apache Ignite offers several ways to improve a Spark job’s performance: Ignite RDD, which represents an Ignite cache as a Spark RDD abstraction, and Ignite IGFS, an in-memory file system that can be transparently plugged into Spark deployments. Ignite RDD allows easily sharing states in-memory between different Spark jobs or applications. With Ignite in-memory shares RDDs, any Spark job can put some data into an Ignite cache that other Spark jobs can access later. Ignite RDD is implemented as a view over the Ignite distributed cache, which can be deployed either within the Spark job execution process or on a Spark worker.

Shared RDD

Before we move on to more advanced topics, let’s have a look at the history of Spark and what kinds of problems can be solved by Ignite RDDs.

Apache Spark was invented by AMPLab for fast computation. It was built on top of Hadoop MapReduce and extends the MapReduce model to efficiently use more types of operations, such as interactive queries and stream processing.

The main difference between Spark and Hadoop MapReduce is that during execution, Spark tries to keep data in memory, whereas Hadoop MapReduce shuffles data into and out of disk. Hadoop MapReduce takes significant time to write intermediate data to disk and read it back. The elimination of these redundant disk operations makes Spark magnitudes faster. Spark can store data (intermediately) into memory without any I/O, so you can keep operating on the same data very quickly.

In order to store data into memory, Spark provides special dataset named Spark RDD. Spark RDD stands for Spark Resilient Distributed Dataset. Spark RDD has fundamental components of the Apache Spark large-scale data processing framework. The following illustration shows iterative operations on Spark RDD.

Spark architecture

Note that the above figure is obtained from the Spark documentation. Spark RDD is an immutable, fault-tolerant distributed collection of data elements. You can imagine Spark RDD as a Hadoop HDFS in memory. Spark RDD supports two types of operations:

  1. Transformations, which create a new dataset from existing one

  2. Actions, which returns a value by performing a computation on the RDD (as shown in the next figure)

Image title

Spark RDD is created through the use of Spark transformation functions. Spark transformation functions can create Spark RDDs from various sources, such as text files. In addition to creating Spark RDDs from the text files, Spark RDDs may be created from external storage such as RDBMS, HBase, Cassandra, or any other data source compatible with Hadoop input format.

Most of the time, Spark RDDs are transformed from one RDD to another new Spark RDD in order to prepare the dataset for future processing. Let’s consider the following data transformations steps in Spark:

  1. Load a text file with airline names and arrival times for any airport in RDD1.

  2. Load a text file with airline names and flight delay information for any airport into RDD2.

  3. Join RDD1 and RDD2 by airline names to get RDD3.

  4. Map on RDD3 to get a nice report for each airline as RDD4.

  5. Save RDD4 to file.

  6. Map RDD2 to extract the information of flight delay for certain airlines to get RDD5.

  7. Aggregate the RDD5 to get a count of how many flights are delayed for each airline as RDD6.

  8. Save the RDD6 into HDFS.

Spark RDDs are utilized to perform computations on an RDD dataset through Spark actions such as count or reduce. But there is a single problem with the Spark RDD: Spark RDD can’t share between Spark Jobs or SparkContext because Spark RDD is bound to a Spark application. With native Spark distribution, the only way to share RDDs between different Spark jobs is to write the dataset into HDFS or somewhere in the file system and then pull the RDDs within the other jobs. However, the same functionality can be achieved by using Alluxio (formerly Tachyon) or Apache Ignite.

Apache Ignite’s memory-centric architecture enables RDD sharing in a very efficient and effective way. Apache Ignite provides IgniteContext and IgniteRDD to share RDDs between Spark applications.

  1. IgniteContext: IgniteContext is the main entry point to the Spark-Ignite integration. To create an instance of an Ignite context, a user must provide an instance of SparkContext and a closure creating IgniteConfiguration (configuration factory). Ignite context will make sure that server or client Ignite nodes exist in all involved job instances. Alternatively, a path to an XML configuration file can be passed to IgniteContext constructor, which will be used to nodes being started.

  2. IgniteRDD: IgniteRDD is an implementation of Spark RDD abstraction representing a live view of Ignite cache. IgniteRDD is not immutable; all changes in the Ignite cache (regardless of whether they were caused by another RDD or by external changes in cache) will be visible to RDD users immediately. IgniteRDD utilizes the partitioned nature of Ignite caches and provides partitioning information to Spark executor. A number of partitions in IgniteRDD equals the number of partitions in the underlying Ignite cache. IgniteRDD also provides affinity information to Spark via getPrefferredLocations so that RDD computations use data locality.

In the next part of this series, we are going to install Apache Spark and do the following:

  1. Run the wordcount example to verify the Spark installation.

  2. Configure Apache Ignite to share RDDs between Spark applications.

  3. Run Spark applications through Spark Shell to use Ignite RDD.

  4. Develop a Scala Spark application to put some Ignite RDD into the Ignite cluster and pull them from another Scala Spark application.

Original Link

Hadoop MapReduce vs. Apache Spark

The term big data has created a lot of hype already in the business world. Hadoop and Spark are both big data frameworks; they provide some of the most popular tools used to carry out common big data-related tasks. In this article, we will cover the differences between Spark and Hadoop MapReduce.


Spark: It is an open-source big data framework. It provides a faster and more general-purpose data processing engine. Spark is basically designed for fast computation. It also covers a wide range of workloads — for example, batch, interactive, iterative, and streaming.

Hadoop MapReduce: It is also an open-source framework for writing applications. It also processes structured and unstructured data that are stored in HDFS. Hadoop MapReduce is designed in a way to process a large volume of data on a cluster of commodity hardware. MapReduce can process data in batch mode.

Data Processing

Spark: Apache Spark is a good fit for both batch processing and stream processing, meaning it’s a hybrid processing framework. Spark speeds up batch processing via in-memory computation and processing optimization. It’s a nice alternative for streaming workloads, interactive queries, and machine learning. Spark can also work with Hadoop and its modules. Its real-time data processing capability makes Spark a top choice for big data analytics.

Its resilient distributed dataset (RDD) allows Spark to transparently store data in-memory and send to disk only what’s important or needed. As a result, a lot of time that’s spent on the disk read and write is saved.

Hadoop: Apache Hadoop provides batch processing. Hadoop develops a great deal in creating new algorithms and component stack to improve access to large scale batch processing.

MapReduce is Hadoop’s native batch processing engine. Several components or layers (like YARN, HDFS, etc.) in modern versions of Hadoop allow easy processing of batch data. Since MapReduce is about permanent storage, it stores data on-disk, which means it can handle large datasets. MapReduce is scalable and has proved its efficacy to deal with tens of thousands of nodes. However, Hadoop’s data processing is slow as MapReduce operates in various sequential steps.

Real-Time Analysis

Spark: It can process real-time data, i.e. data coming from real-time event streams at the rate of millions of events per second, such as Twitter and Facebook data. Spark’s strength lies in its ability to process live streams efficiently.

Hadoop MapReduce: MapReduce fails when it comes to real-time data processing, as it was designed to perform batch processing on voluminous amounts of data.

Ease of Use

Spark: Spark is easier to use than Hadoop, as it comes with user-friendly APIs for Scala (its native language), Java, Python, and Spark SQL. Since Spark provides a way to perform streaming, batch processing, and machine learning in the same cluster, users find it easy to simplify their infrastructure for data processing. An interactive REPL (Read-Eval-Print Loop) allows Spark users to get instant feedback for commands.

Hadoop: Hadoop, on the other hand, is written in Java, is difficult to program, and requires abstractions. Although there is no interactive mode available with Hadoop MapReduce, tools like Pig and Hive make it easier for adopters to work with it.

Graph Processing

Spark: Spark comes with a graph computation library called GraphX to make things simple. In-memory computation coupled with in-built graph support allows the algorithm to perform much better than traditional MapReduce programs. Netty and Akka make it possible for Spark to distribute messages throughout the executors.

Hadoop: Most processing algorithms, like PageRank, perform multiple iterations over the same data. MapReduce reads data from the disk and, after a particular iteration, sends results to the HDFS, and then again reads the data from the HDFS for the next iteration. Such a process increases latency and makes graph processing slow.

In order to evaluate the score of a particular node, message passing needs to contain scores of neighboring nodes. These computations require messages from its neighbors, but MapReduce doesn’t have any mechanism for that. Although there are fast and scalable tools like Pregel and GraphLab for efficient graph processing algorithms, they aren’t suitable for complex multi-stage algorithms.

Fault Tolerance

Spark: Spark uses RDD and various data storage models for fault tolerance by minimizing network I/O. In the event of partition loss of an RDD, the RDD rebuilds that partition through the information it already has. So, Spark does not use the replication concept for fault tolerance.

Hadoop: Hadoop achieves fault tolerance through replication. MapReduce uses TaskTracker and JobTracker for fault tolerance. However, TaskTracker and JobTracker have been replaced in the second version of MapReduce by Node Manager and ResourceManager/ApplicationMaster, respectively.


Spark: Spark’s security is currently in its infancy, offering only authentication support through shared secret (password authentication). However, organizations can run Spark on HDFS to take advantage of HDFS ACLs and file-level permissions.

Hadoop MapReduce: Hadoop MapReduce has better security features than Spark. Hadoop supports Kerberos authentication, which is a good security feature but difficult to manage. Hadoop MapReduce can also integrate with Hadoop security projects, like Knox Gateway and Sentry. Third-party vendors also allow organizations to use Active Directory Kerberos and LDAP for authentication. Hadoop’s Distributed File System is compatible with access control lists (ACLs) and a traditional file permissions model.


Both Hadoop and Spark are open-source projects, therefore come for free. However, Spark uses large amounts of RAM to run everything in-memory, and RAM is more expensive than hard disks. Hadoop is disk-bound, so saves the costs of buying expensive RAM, but requires more systems to distribute the disk I/O over multiple systems.

As far as costs are concerned, organizations need to look at their requirements. If it’s about processing large amounts of big data, Hadoop will be cheaper since hard disk space comes at a much lower rate than memory space.


Both Hadoop and Spark are compatible with each other. Spark can integrate with all the data sources and file formats that are supported by Hadoop. So, it’s not wrong to say that Spark’s compatibility with data types and data sources is similar to that of Hadoop MapReduce.

Both Hadoop and Spark are scalable. One may think of Spark as a better choice than Hadoop. However, MapReduce turns out to be a good choice for businesses that need huge datasets brought under control by commodity systems. Both frameworks are good in their own sense. Hadoop has its own file system that Spark lacks, and Spark provides a way for real-time analytics that Hadoop does not possess.

Hence, the differences between Apache Spark vs. Hadoop MapReduce shows that Apache Spark is much more advanced cluster computing engine than MapReduce. Spark can handle any type of requirements (i.e. batch, interactive, iterative, streaming, graph) while MapReduce limits to batch processing.

Happy learning!

This article was first published on the Knoldus blog.

Original Link

Big Data Is Growing and Apache Hadoop Is Legion

Some pundits with no experience at major corporations and who haven’t written big data code in the last five years may have delusions of Apache Hadoop shrinking or vanishing into the cloud or into some imagined Apache Spark ecosphere.

This is beyond wrong.

Apache Hadoop is evolving to the point where people don’t even need to mention it by name. It’s an “everyone” platform that is taken for granted. Most major players have adopted it and have been running it. Many are moving all of their legacy data to Apache Hadoop and sunsetting dozens of systems from proprietary data warehouses, legacy relational databases, failed weird NoSQL stores, and a mishmash of various data sources.

Image title

Apache Hadoop and Apache Spark are part of the Apache big data environment that work together like peanut butter and jelly. There’s really no good reason to not run Apache Spark on top of YARN in Apache Hadoop. You have the powerful nodes, right near the data they need. Apache Spark SQL is great, but by using the Apache Hive context, you get catalogs and access to all your Apache Hive tables. By running Apache Spark inside Apache Hadoop, you get the advantage of row and column-level control with Apache Ranger.

Apache Spark is a popular execution engine that plugs in well to Apache Hadoop. But so do Apache Storm, Apache Flink, Apache Apex, and dozens more. Fortunately, Google has put out Apache Beam to help consolidate this execution engine sprawl.

To run Apache Spark without Apache Hadoop is at perhaps okay for temporary ephemeral data science purposes, but even then I don’t think so. Security, data governance, users, groups, execution queues, data catalog, data model management, machine learning model management, and dozens of other real concerns for real enterprise users require more than just Apache Spark. Apache Spark wasn’t designed to replace Hadoop since it has no storage. Compute and storage need to work together for real-world applications. You need to run lots of batch and streaming workloads on top of a cluster as well as store petabytes of data. This same environment allows for deep learning, machine learning, IoT, computer vision, and all other big data concerns to be addressed and run at scale.

Apache NiFi also makes Apache Hadoop the core place to store and retrieve all the data you need for all the IoT, mobile, AI, and “real-time” applications that enterprises need.

For amateur developers, maybe you can just run Apache Spark and Apache NiFi on your desktop and not use Apache Hadoop. You would be losing out on things like Apache Zeppelin for notebooks to easily run and develop machine learning and data federation applications.

One has to remember that Apache Hadoop is not one thing — it’s a platform of tools, libraries, and services integrated together for NoSQL, SQL, batch, streaming, storage, and many other purposes.

Apache Hadoop is now in people’s on-premises data centers, multiple clouds, and hybrid combinations of the two. Apache Hadoop is inside Azure HDInsight, Hortonworks Data Cloud in Amazon, Hortonworks CloudBreak for Every Cloud… it’s hard to avoid Apache Hadoop.

Apache Hadoop may not look like the MapReduce-only data of old. It’s now a multifaceted distributed compute and storage platform that includes streaming, NoSQL, real-time SQL, batch SQL, batch jobs, Apache Spark jobs, deep learning, machine learning, messaging, IoT, and more.   

Apache Hadoop is far from dead, Apache Hadoop is legion. Perhaps MapReduce is on the way out, as most services are running on Apache Tez, Apache Spark, and other engines inside of Apache Hadoop big data platform. The highlighted projects could exist as services on their own but as part of an integrated platform become incredibly powerful and easy to use.

Image title

Let’s not forget some of the projects:

  • Apache Hive (this is the SQL you are looking for)
  • Apache Spark
  • Apache HBase
  • Apache Phoenix
  • Apache Atlas
  • Apache Ranger
  • Apache Storm
  • Apache Accumulo
  • Apache Pig
  • Druid
  • Apache Sqoop
  • Apache SuperSet
  • Apache NiFi
  • Apache Kafka
  • Apache Knox
  • Hortonworks Streaming Analytics Manager
  • Hortonworks Schema Registry
  • SnappyData
  • DL4J
  • TensorFlow
  • IBM BigSQL
  • Apache HAWQ
  • Apache Calcite
  • Apache Ambari
  • Apache Oozie
  • Apache ZooKeeper
  • Apache Zeppelin

These projects all have huge ecosystems and a large number of users. When we factor all of this together, Apache Hadoop is huge and growing. If we look at Google Trends, we see that Apache Spark, Apache Hadoop, and Apache Kafka trend together since they should be thought of as a bundle of awesome big data services and projects.

Image title

Original Link

ETL Pipeline to Analyze Healthcare Data With Spark SQL, JSON, and MapR-DB

This post is based on a recent workshop I helped develop and deliver at a large health services and innovation company’s analytics conference. This company is doing a lot of interesting analytics and machine learning on top of the MapR Converged Data Platform, including an internal “Data Science University.” In this post, we will:

  • Extract Medicare Open payments data from a CSV file and load into an Apache Spark Dataset.
  • Analyze the data with Spark SQL.
  • Transform the data into JSON format and save to the MapR-DB document database.
  • Query and Load the JSON data from MapR-DB back into Spark.

CSV Spark SQL and MapR-DB

A large health payment dataset, JSON, Apache Spark, and MapR-DB are an interesting combination for a health analytics workshop because:

  • JSON is an open-standard and efficient format that uses human-readable text to represent, transmit, and interpret data objects consisting of attribute-value pairs. Because JSON is easy for computer languages to manipulate, JSON has supplanted XML for web and mobile applications.
  • Newer standards for exchanging healthcare information such as FHIR are easier to implement because they use a modern web-based suite of API technology, including REST and JSON.
  • Apache Spark SQL, DataFrames, and datasets make it easy to load, process, transform, and analyze JSON data.
  • MapR-DB, a high-performance NoSQL database, supports JSON documents as a native data store. MapR-DB makes it easy to store, query, and build applications with JSON documents.

Apache Spark and MapR-DB

Apache Spark and MapR-DB

One of the challenges that comes up when you are processing lots of data is where you want to store it. With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.

Fast Reads and Writes by Key

The MapR-DB OJAI Connector for Apache Spark makes it easier to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline. Included is a set of APIs that that enable MapR users to write applications that consume MapR-DB JSON tables and use them in Spark.

Spark MapR-DB connector

The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.

Connection in every Spark Executor

Example Use Case Dataset

Since 2013, Open Payments has been a federal program that collects information about the payments drug and device companies make to physicians and teaching hospitals for things like travel, research, gifts, speaking fees, and meals.

The Facts About Open Payments Data

Below is an example of one line from an Open Payments CSV file:

"NEW","Covered Recipient Physician",,,,"132655","GREGG","D","ALZATE",,"8745 AERO DRIVE","STE 200","SAN DIEGO","CA","92123","United States",,,"Medical Doctor","Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology","CA",,,,,"DFINE, Inc","100000000326","DFINE, Inc","CA","United States",90.87,"02/12/2016","1","In-kind items and services","Food and Beverage",,,,"No","No Third Party Payment",,,,,"No","346039438","No","Yes","Covered","Device","Radiology","StabiliT",,"Covered","Device","Radiology","STAR Tumor Ablation System",,,,,,,,,,,,,,,,,"2016","06/30/2017"

There are a lot of fields in this file that we will not use; we will select the following fields:

CSV Fields

And transform them into the following JSON object:

{ "_id":"317150_08/26/2016_346122858", "physician_id":"317150", "date_payment":"08/26/2016", "record_id":"346122858", "payer":"Mission Pharmacal Company", "amount":9.23, "Physician_Specialty":"Obstetrics & Gynecology", "Nature_of_payment":"Food and Beverage"

Apache Spark SQL, Datasets, and DataFrames

A Spark dataset is a distributed collection of data. Dataset is a newer interface, which provides the benefits of the older RDD interface (strong typing, ability to use powerful lambda functions) combined with the benefits of Spark SQL’s optimized execution engine. Datasets also provide faster performance than RDDs with more efficient object serialization and deserialization.


A DataFrame is a dataset organized into named columns Dataset[Row]. (In Spark 2.0, the DataFrame APIs merged with Datasets APIs.)

Unified Apache Spark 2.0 API

Read the Data From a CSV File Into a Dataframe

In the following code:

  1. The SparkSession read method loads a CSV file and returns the result as a DataFrame.
  2. A user-defined method is used to convert the amount column from a string to a double.
  3. A local temporary view is created in order to easily use SQL.

Read the data from CSV file into a Dataframe

One row of the DataFrame is shown below:

One row from DataFrame

Transform Into a Dataset of Payment Objects

Next, we want to select only the fields that we are interested in and transform them into a Dataset of payment objects. First, we define the payment object schema with a Scala case class:

Define the Payment Schema

Next, we use Spark SQL to select the fields we want from the DataFrame and convert this to a Dataset[Payment] by providing the Payment class. Then, we replace the Payment view.

Create a Dataset of Payment classes

One row of the Dataset[Payment] is shown below:

One row of the Dataset\[Payment\]

Explore and Query the Open Payment Data With Spark Dataset

Datasets provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples. The Dataset show() action displays the top 20 rows in a tabular form.

Domain-specific language

Dataset’s printSchema() prints the schema to the console in a tree format:

printSchema() prints to console in tree format

Here are some example queries using the Scala Dataset API on the payments Dataset.

What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?

What are the Nature of Payments with payments &gt; $1000 with count

What are the top five Nature of Payments by count?

What are the Top 5 Nature of Payments by count

You can register a Dataset as a temporary table using a given name and then run Spark SQL. With the Zeppelin Notebook, you can display query results in table or chart formats. Here are some example Spark SQL queries on the payments dataset.

What are the top ten Nature of Payments by count?

What are the top 10 nature of payments by count?

What are the top ten Nature of Payments by total amount?

What are the top 10 nature of payments by total amount?

What are the top five physician specialties by total amount?

What are the top 5 physician specialties by total amount?

Here is the same query with the result displayed in a pie chart:

What are the Top 5 Physicians by total amount? (Chart)

Saving JSON Documents in a MapR-DB JSON Table

In order to save the JSON objects to MapR-DB, the first thing we need to do is define the _id field, which is the row key and primary index for MapR-DB. In the function below, we create an object with the id equal to a combination of the physician ID, the date, and the record ID. This way the payments will be grouped by physician and date. Next, we use a map operation with the createPaymentwId function to convert the Dataset[Payment] to a Dataset[PaymentwId], then we convert this to an RDD of JSON documents. (Note that with MapR-DB v6, the Spark connector will support Datasets.)

Transform Dataset into RDD of JSON documents

One row of the RDD of JSON documents is shown below:

One row of the RDD of JSON documents

In the code below, we save the RDD of JSON objects into a MapR-DB JSON table:

Save JSON RDD to MapR-DB

Note that in this example, the table was already created. To create a table using the shell, execute the following at the Linux command line:

mapr dbshell

After starting the shell, run the create command. See mapr dbshell.

Loading Data From a MapR-DB JSON Table

The code below loads the documents from the /user/user01/testable table into an RDD and prints out two rows:

Load the Payments from MapR-DB

Projection Pushdown and Predicate Pushdown for the Load API

The “load” API of the connector also supports select and where clauses. These can be used for projection pushdown of subsets of fields and/or can filter out documents by using a condition.
Here is an example of how to use the where clause to restrict the rows:

Load the Payments for a physician from MapR-DB

nature_of_payment and payer fields

Similarly, if one wants to project only the nature_of_payment and payer fields, and to use the where clause to restrict the rows by amount, the following code will generate the required output:

Load the Payment where the amount is greater than 100 from MapR-DB



In this blog post, you’ve learned how to ETL Open Payments CSV file data to JSON, explore with SQL, and store in a document database using Spark Datasets and MapR-DB.

Original Link

Apache Spark Word Count: Data Analytics With a Publicly Available Dataset

In a previous post, I discussed my findings for developing a simple word count app in Java and running it against the text of a typical novel, such as Alice in Wonderland.

Here’s a summary of finding so far:

  • 2012 MacBook Pro with a 2.4GHz i7 and SanDisk SSD — it completed in 100ms
  • 2015 MacBook Pro with a 2.2GHz i7 and stock SSD — it completed in 82ms
  • Ubuntu 16.04 server running under VMware ESXi on my HP DL380 Gen7 rack server configured with 2 x 2.4 GHz vCPUs, 6GB RAM — it completed in 250ms 
    • The slower time probably can be accounted for slower data access from the WD Black 7200rpm HDDs in my server, which is likely an I/O bottleneck compared to loading the same data file from SSD

Now, let’s take it up a notch and throw some larger datasets into the mix using the 434,886ms to complete or just over Yelp dataset — taking the reviews.json file exported as a CSV file running the same app on the same Ubuntu server in a VM on the DL380, performing the same word count against a 2.8GB file. With the same Java word count app, this took seven minutes. OK, now we’ve got something large enough to play with as a benchmark:

kev@esxi-ubuntu-mongodb1 : ~ $ java -jar wordcount.jar ./data/yelp/reviewtext.csv 

Incidentally, if you’re interested what the word counts from the Yelp reviews.json data file look like, here’s the first few counts in descending order before we get to some terms you’d expect in Yelp reviews, like “good” and “food”:

the : 22169071
and : 18025282
I : 13845506
a : 13416437
to : 12952150
was : 9340074
of : 7811688 : 7600402
is : 6513076
for : 6207595
in : 5895711
it : 5604281
The : 4702963
that : 4446918
with : 4353165
my : 4188709
but : 3659431
on : 3642918
you : 3570824
have : 3311139
this : 3270666
had : 3015103
they : 3001066
not : 2829568
were : 2814656
are : 2660714
we : 2639049
at : 2624837
so : 2378603
place : 2358191
be : 2276537
good : 2232567
food : 2215236

Now, let’s look at rewriting the analysis using Apache Spark. The equivalent code using the Spark API for loading the dataset and performing the word count turned out to be like this (although if you search for “Apache Spark word count,” there’s many different ways you could use the available APIs to implement a word count):

JavaRDD<String> lines =; JavaPairRDD<String, Integer> words = lines .flatMap(line -> Arrays.asList(line.split("\s+")).iterator()) .mapToPair(word -> new Tuple2<>(word, 1)) .reduceByKey((x, y) -> x + y); List<Tuple2<String, Integer>> results = words.collect();

Submitting the job to run on a standalone Spark node (an Ubuntu Server 16.04 VM on ESXi) with 1 core (-master local[1]):

./bin/spark-submit --class "kh.textanalysis.spark.SparkWordCount" --master local[1] ../spark-word-count-0.0.1-SNAPSHOT.jar ../data/yelp/reviewtext.csv

The job completes in 336,326ms (or approx. 5.6 minutes). At this point, this is with minimal understanding of how best to approach or structure an effective Spark job, but we’re already made an improvement and this first test is with a single Spark local node and no additional worker nodes in the cluster.

Next, with a standalone local node, and 2 cores (-master local[2]): 170261ms, or 2.8 mins. Now we’re talking.

Let’s trying deploying the master node, and then add some worker nodes.

17/11/07 18:43:33 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://… 17/11/07 18:43:33 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master

My first guess is that the default 7077 port is not open, so:

sudo ufw allow 7077

Retrying, now the job has submitted, and starts up but gives this error.

17/11/07 18:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 22 tasks 17/11/07 18:47:44 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Taking a look at the master node web UI on 8080:

True, we have the master started, but no workers yet — so let’s start up 1 slave node first (another Ubuntu Server 16.04 VM on ESXi):

$ ./ -m 1G spark://
starting org.apache.spark.deploy.worker.Worker, logging to /home/kev/spark-2.2.0-bin-hadoop2.7/logs/spark-kev-org.apache.spark.deploy.worker.Worker-1-ubuntuspk1.out

Now, we’ve got one worker up and ready:

Submitting the same job again to the master node, there’s a FileNotFound on not finding the file we’re attempting to process:

Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6,, executor 0): File file:/home/kev/data/yelp/reviewtext.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.

I was curious how the data file would be shared with the slave nodes, and this answers my question. I guess it does not (or, the way I’ve implemented my job so far assumes the data file is local). Clearly, have some more work to do in this area to work out what the approach is to share the data file between the nodes. In the meantime, I’m just going to copy the same file across each of the worker nodes to get something up and running. I’m guessing a better way would be to mount a shared drive between each of the workers, but I’ll come back to this later.

The data file copied to my worker VMs, and restarting, now we’ve got the job running on one slave, one core, 1GB:

Here’s an interesting view of the processing steps in my job:

Taking a quick look at the HP server ILO — the CPUs are barely breaking a sweat and fan speeds are still low:

The Spark dashboard shows completion times, so now we’re at 5.3 minutes:

Let’s add an extra vCPU to the first worker node (I need to reconfigure my VM in ESXi and then restart). The console’s showing an additional stopped worker from a second slave VM that I’ll start up for the last test. First, two vCPUs, starting up with -c 2 for two cores:

./ -c 2 -m 1G spark://

166773ms, or 2.7 minutes. Looking good!

Now, with the additional second slave node, also started with two vCPUs, we have two worker slave nodes (four vCPUs total). During the middle of the run, checking fan speeds — warming up and fans are running a little faster — but nothing crazy so far:

102977ms, or 1.7 minutes!

Curious how far we can go… let’s try two worker slave nodes, four vCPUs each, and bump up the available RAM to 3GB. Reconfigure my VMs, and off we go:

./ -c 4 -m 3G spark://

81998ms, or 1.4 minutes!

Still pretty good, although the performance gains for doubling the cores per worker and adding more RAM seems to be leveling off. And now, we’re not seeing the same magnitude of improvements. So it’s possible that at this point, it might be more interesting to add additional slave worker nodes. I’ve been creating my ESXi VMs by hand up until this point, so maybe this is the time to look into some automation for spinning up multiple copies of the same VM.

Let’s summarize the results so far, run the standalone Java 8 app on the same HP server as Spark, and then compare the various Spark configurations so far:

I’ve got ample resources left on my HP DL380 G7 rack server to run a few more VMs equally sized to what I have running, so maybe if I can work out an easy way to template the VMs I have so far, I’ll spin up some additional VMs as worker nodes and see what’s the fastest processing time I can get with the hardware I have. More updates to come later.

Original Link

Running Your First Apache Spark App [Code Snippets]

The Spark Getting Started Guide is pretty good, but it’s not immediately obvious that you don’t run your app using Spark API as a standalone executable app. If you try, you’ll get an error like this:

17/11/07 19:15:20 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:376)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
at kh.textanalysis.spark.SparkWordCount.workCount(
at kh.textanalysis.spark.SparkWordCount.main(

Instead, if using Maven, package the app with mvn package and start a local master node:


Then, you submit it to your Spark node for processing:

./sbin/spark-submit \ --class "MyApp" \ --master local[1] \ target/MyApp-1.0.jar

And that’s it! 

Original Link

Applying SQL Analytics and Windowing Functions to Spark Data Processing

The purpose of this post is to share my latest experience with Talend in the field, which is also the first time I have gotten to see the capacity Talend has to perform SQL queries inside any Talend big data batch jobs using the Spark framework. In doing so, I want to teach you how to apply SQL analytics and windowing functions to process data inside Spark!

Depending on how familiar you are with the Talend platform, you may or may not know about how our big data integration solution gives developers and power users the ability to generate code that is natively executable on a Hadoop cluster; whether it’s MapReduce, Spark, Spark Streaming, or Storm.

Technically, Talend will not require an agent to be installed on your Hadoop cluster; a connection to YARN is the only prerequisite. This is the beauty of using a solution based on open-source standards that have always taken a no-vendor-lock-in approach.

Because Talend’s framework is open, users will be able to inject code inside their integration jobs. Most of the time, they will reuse a Java routine when they can’t easily achieve a processing step on the data they are working with. In today’s blog, I want to focus on how to leverage Spark SQL code within a big data Spark job, and why it matters a lot!

The Need for Speed!

Everything started with what looked like a simple use case for a recent big data POC. It was a classic banking use case where we needed to show how to calculate a running balance from transaction data. Basically, we had two data sources:

  1. The list of transactions of the day to process (3M+ records)
  2. The historical table of end-of-the-day balance for all the accounts in the bank (8M+ records)

While this scenario would have been simple on the whiteboard, when it finally came time to implement the use case in Talend Studio, and it must be performant, this is when the rubber met the road!

Being kind of a brute-force approach guy, I thought:

“Get the transactions table sorted, add a sequence number, get the end-of-day balance value you need from that 9M+ rows historic table, then put that in the cache (thanks to a nice tCacheOutput component). Then you’ll need to do a join between your cached transactions table and the original transactions table, get some join there and BOOM, calculation, computation, magic, BOOM, result.”

Of course, that sounds a little bit of an overkill, as it would have required the use a lot of memory and the need to compare a lot of rows together just to get the previous amount of each row. This approach wouldn’t have worked, anyway, given that the prospect informed us, “Oh, by the way, your competition did that in three minutes.”

All right, it was time to think about a smarter way, brute force has its limitations after all!

SQL Analytics Function Baby!

When it comes to smart stuff in big data, let the truth be told, my peers don’t look at me first, they look at our internal Black Belt Bullet Proof Certified Fellows here at Talend. I did the same, got this Jedi-like response: “Little Padawan, the way of tSQLRow, take you must”.

My Jedi Data Master started getting quite excited when the challenge came so he helped to build the first draft of jobs where he was using a tSQLRow.

The beauty of tSQLRow in a Spark job is that you can use SQL queries inside your job, and that query will apply ON THE JOB DATA! Yeah, yeah, not a database or any other hybrid stuff, the actual data that is gathered inside your job! So all the WHERE, ORDER BY, GROUP BY, SUM(), COUNT(), and other funny operations can be done through that SQL API inside a job. Yes, that’s cool!

These specifics functions exist since version Spark 1.4+; Talend is already at Spark 2.1, so it’s usable there.

Thinking about the use case, it was about:

  • Getting the latest value for an end-of-day balance
  • Summing up transaction amount row by row inside each account

Not to mention the need to have some temporary variable to deal with the other constraints of the use case that are not part of the explanation here (i.e. dealing with figure transactions values, generating a row number, etc.).

And this is where I discovered the existence of analytics and windowing functions in SQL; probably not a surprise for some of you reading this article, but a totally new discovery for me!

This where I started getting my hands dirty, and I must say, I just couldn’t get enough!

Partition it, sort it, window it, compute it, filter it, shuffle it…

First, let’s have a look at the data used for my local test with local Spark engine on my Talend Studio (BTW, I’m using Talend Studio 6.3.1 — the Big Data Platform edition).

EOD balance sample data:

100|12345|2016-06-08 00:00:00|1.02
100|12345|2016-06-07 00:00:00|0.02
102|20006|2016-06-07 00:00:00|5.02
102|20006|2016-06-08 00:00:00|6.02

Transactions sample data:

103|20007|2016-06-09 02:00:00|105508585836|2016-06-10 00:00:00|F|6.90|D|20160609
100|12345|2016-06-09 06:00:00|111454018830|2016-06-12 00:00:00|C|0.6|D|20160609
102|20006|2016-06-09 01:00:00|125508585836|2016-06-09 00:00:00|F|5.50|D|20160609
100|12345|2016-06-09 02:00:00|33042764824|2016-06-08 00:00:00|B|0.05|D|20160609
101|22222|2016-06-09 02:00:00|121554018830|2016-06-09 00:00:00|C|0.5|D|20160609
100|12345|2016-06-09 02:00:00|33042764825|2016-06-08 00:00:00|B|0.08|D|20160609
100|12345|2016-06-09 03:00:00|33042764830|2016-06-09 00:00:00|C|1.06|D|20160609
100|12345|2016-06-09 05:00:00|110451035129|2016-06-11 00:00:00|C|0.21|D|20160609
100|12345|2016-06-09 07:00:00|185508585836|2016-06-13 00:00:00|F|0.38|D|20160609
100|12345|2016-06-09 04:00:00|33042766082|2016-06-10 00:00:00|C|4.51|D|20160609
101|22222|2016-06-09 01:00:00|101554018830|2016-06-08 00:00:00|C|0.8|C|20160609

See below the job design I used to test my logic on dummy data. The first two components are tFixedflow, and I used the above sample data as my test data to validate that it worked correctly.

The initial steps are mainly to filter, sort the data correctly (by account number and transaction date), and retrieve the latest EOD balance for each account when it exists (otherwise, it means this account is new and it is the first time there has been a transaction in it). It also creates a unique transaction ID for each transaction row using a sequence function (available in the numeric library in Talend). This is not just for fun; one of the main headaches I experience was the understanding of the behavior of the analytics function, such as SUM() or LAST_VALUE(). And having a unique identifier to sort the data inside the partition is mandatory to get the result you want in some cases. This was the case for the SUM() function.

The first tSQLRow component happens right after the join of the data is done. Here is the content of it:

"select posting_transit, posting_acct_num, business_date, system_time, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, isknownaccnt,ROW_NUMBER() OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date ASC) as rowNum, sum(txn_amt) OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date,seq ASC) as balance from out1"

The focus here is on the two functions and the partitioning of the data

  1. ROW_NUMBER(): return the row number in the order of appearance inside the partition created (so related to the ORDER BY operation).
  2. SUM(txn_amt): will sum the txn_amt value row by row while the ORDER BY criteria is unique. That’s a critical step for the running balance calculation. If the order by criteria of the partition (here business_dateposting_dateseq ASC) were not unique — achieved thanks to the seq variable — then we will end up with a sum of all the txn_amt that happen on the same business_date and posting_date. Which is not what we want.

This is pretty well explained here with sample data.

When It Comes to Size, Partition Matters!

The OVER(PARTITION BY a,b ORDER BY c,d) instruction is key in our job. Because Spark is an in-memory framework that can be run in parallel in a grid, and especially in a Hadoop cluster, partitioning the data makes a lot of sense to be performant!

Without being able to give details, the work of the PARTITION BY a,b will be to select a group of data logically to apply a function only to that group of data.

In my use case, I wanted to partition the data by account number (aggregation of transit_number and acct_num). So I only apply my ORDER BY instruction and my analytic function SUM() or ROW_NUMBER() on that particular chunk of data. So, from 3M+ transactions data in one group, I will now have X number of transactions in Y number of partitions. Here, I would estimate the number of groups to be around 600 000 with an average of five transactions per PARTITION.

So instead of having a huge processing of all the data, I now have a very quick processing in parallel in-memory of numerous, very small groups of data.

Yeah, I know, that’s cool! And that should also lead to a pretty good performance result! (Well, that depends on a lot of things, but this partitioning definitely helps!)

After that first tSQLRow, my data looks like this:

The SUM(txn_amnt) Mechanism

The data is sorted and grouped and the Double value you see (with a lot of numbers after the comma) is the running balance calculated without the EOD balance taken into consideration now. This incremental row-by-row behavior is really due to the fact that we ordered the partition with a unique identifier for each row (yes, I insist on that, as it just took me half a day to understand that and create that sequence seq). Not having that and you’ll end up — like me at first sight — with a real sum of all the txn_amt with the same business_date and posting_date.

Look at the first seven rows — they are part of the same group (account: 100 12345).

  • 1.02 (row 1 EOD Balance) – 0,05 (row 1 txn_amount) = 0.97 (row1 balance)

  • 0,97 (row 1 balance – previously calculated-) – 0,08 (row 2 txn_amount) = 0,89 (row 2 balance)

  • 0,89 (row 2 balance – previously calculated-) – 1,06 (row 3 tx_amount) = – 0,17 (row 3 balance)

  • And so on!

The ROW_NUM() Mechanism

You can easily see the result of the row_num() function applied by the partition as it reset to 1 after each new transit_number accnt_number.

Last, But Not Least!

The final step of my job was to deal with the future transactions. These transactions don’t require the running balance to calculate as the other (the current and backdated transactions). The future transaction (indicated with an F  inside the business_date_indi field) would require having the previous end of day balance value as their running balance value or “null” or the LAST value calculated before.

Let’s say I already add three current or backdated transactions for my account 10012345 with an already calculated running balance. So, if I have one or many future transactions for the same account 10012345, then I want to set the running balance as the last calculated value for the running balance (yeah, I know… it sounds logical on your bank account report or your credit card report.).

That’s where I looked at the function called LAST_VALUE(). And I used it in my last tSQLRow  component in my job.

See the content below:

"select posting_transit, posting_acct_num, business_date, system_time, posting_date, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, rowNum , balance,last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance, max(F_tx_only) OVER(PARTITION BY posting_transit, posting_acct_num) as F_tx_only, isknownaccnt from bal ORDER BY posting_transit, posting_acct_num, business_date, system_time "

Let’s focus on this particular piece:

last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance

So, now you get the partition piece of the story. You’ll notice the absence of the ORDER BY instruction. This is on purpose as using another ORDER BY here would result in an error. I think it’s likely because we previously ORDER BY-ed the same partition, but honestly, there might be an explanation I don’t get (comments appreciated).

So, last_value(balance) will return the last_value of the balance calculated inside that partition, whereas, in the case of a future transaction, we don’t have any running balance calculated.

So one of the tricks I used was to set the “balance” to null in a previous step when it was a future transaction. But the last_value(balance) would still return that null value by default.

To avoid that, this makes the difference: last_value(balance, true). I hope you read until this point because it took me five hours of googling to get it right, and when I found that, that’s when I decided to write my first blog article!

So, this , true parameter is telling the last_value() function to avoid the null values in its computation. So now you understand why I put those future transaction balance values to null before. It was to avoid them later!

(<private joke> for French readers. I tried the so-called “Rémy OffTheWood” trick and replaced “true” by “trou”…well it doesn’t work, don’t try it at home) </private joke>)


So, how did all this end up in terms of performance against our competition? Well, our first test of the job with no tuning at all took something like six minutes to compute, but of course, we wanted to do better, so we applied the Spark configuration tuning capabilities inside Talend jobs.

And in one round of tuning, we then turned in 2 minutes, 30 seconds execution time: the most performant result overall!

Next, our prospect tested against more volume and increased both transactions table volume and EOD balance table (70 million EOD_balance and 38 million for transactions).

With no change to the job (not even tuning properties), we ran in ten minutes!

So, basically multiplying by 10x the volume in both lookup and main data just took 4x the time to process without any change. This is what I call native scalability. Oh, and the competition wasn’t even close.


I hope that quick read will avoid hours of Googling for answers to a common integration scenario like I did. This function inside Spark framework is great — and combined with Talend Studio’s capacity, it’s just awesome!

Ease-of-use-wise, it’s just great to be able to reuse SQL skills and apply it to the big data world this easily! 

Here is the result of my job:

|posting_transit|posting_acct_num|business_date|system_time |posting_date |business_date_indi|txn_amt|dr_cr_ind|proc_dt |end_of_day_bal|running_balance|
|100 |12345 |1465437600000|33042764824 |1465344000000|B |-0.05 |D |20160609|1.02 |0.97 |
|100 |12345 |1465437600000|33042764825 |1465344000000|B |-0.08 |D |20160609|1.02 |0.89 |
|100 |12345 |1465441200000|33042764830 |1465430400000|C |-1.06 |D |20160609|1.02 |-0.17 |
|100 |12345 |1465444800000|33042766082 |1465516800000|C |-4.51 |D |20160609|1.02 |-4.68 |
|100 |12345 |1465448400000|110451035129|1465603200000|C |-0.21 |D |20160609|1.02 |-4.89 |
|100 |12345 |1465452000000|111454018830|1465689600000|C |-0.6 |D |20160609|1.02 |-5.49 |
|100 |12345 |1465455600000|185508585836|1465776000000|F |-0.38 |D |20160609|1.02 |-5.49 |
|101 |22222 |1465434000000|101554018830|1465344000000|C |0.8 |C |20160609|0.0 |0.8 |
|101 |22222 |1465437600000|121554018830|1465430400000|C |-0.5 |D |20160609|0.0 |0.3 |
|102 |20006 |1465434000000|125508585836|1465430400000|F |-5.5 |D |20160609|6.02 |6.02 |
|103 |20007 |1465437600000|105508585836|1465516800000|F |-6.9 |D |20160609|0.0 |null | '-----+------+-----+----+-----+------+---+---+---+-----+-----'


Original Link

Using Kylo for Self-Service Data Ingestion, Cleansing, and Validation

Kylo is a feature-rich data lake platform built on Apache Hadoop and Apache Spark. Kylo provides a business-friendly data lake solution and enables self-service data ingestion, data wrangling, data profiling, data validation, data cleansing/standardization, and data discovery. Its intuitive user interface allows IT professionals to access the data lake (without having to code).

Though there are many tools to ingest batch data and/or streaming or real-time data, Kylo supports both. It provides a plug-in architecture with a variety of extensions. Apache NiFi templates provide incredible flexibility for batch and streaming use cases.

In this post, let’s discuss ingesting data from Apache Kafka, performing data cleansing and validation at real-time, and persisting the data into Apache Hive table.


  • Install Kafka
  • Deploy Kylo, where the deployment requires knowledge on different components/technologies such as:
    • AngularJS for Kylo UI
    • Apache Spark for data wrangling, data profiling, data validation, data cleansing, and schema detection
    • JBoss ModeShape and MySQL for Kylo Metadata Server
    • Apache NiFi for pipeline orchestration
    • Apache ActiveMQ for interprocess communication
    • Elasticsearch for search-based data discovery
    • All Hadoop technologies but most preferably HDFS, YARN, and Hive

To learn more about basics and installation of Kylo in an AWS EC2 instance, refer to our previous blog on Kylo setup for data lake management.

Data Description

A user transaction dataset with 68K rows, generated by Treselle team, is used as the source file. The input dataset has time, UUID, user, business, address, amount, and disputed columns.

Sample dataset:select

Examples of invalid and missing values in the dataset:select

Use Case

  • Publish user transaction dataset into Kafka.
  • Ingest data from Kafka using Kylo data ingestion template and standardize and validate data.


  • Customize data ingest pipeline template.
  • Define categories for feeds.
  • Define feeds with source and destination.
  • Cleanse and validate data.
  • Schedule feeds.
  • Monitor feeds.

Self-Service Data Ingest, Data Cleansing, and Data Validation

Kylo utilizes Spark to provide a pre-defined pipeline template, which implements multiple best practices around data ingestion. By default, it comes up with file system and databases. It helps business users in simplifying the configuration of ingest data from new sources such as JMS, Kafka, HDFS, HBase, FTP, SFTP, REST, HTTP, TCP, IMAP, AMQP, POP3, MQTT, WebSocket, Flume, Elasticsearch and Solr, Microsoft Azure Event Hub, Microsoft Exchange using Exchange Web Services (EWS), Couchbase, MongoDB, Amazon S3, SQS, DynamoDB, and Splunk.

Apache NiFi, a scheduler and orchestration engine, provides an integrated framework for designing new types of pipelines with 250+ processors (data connectors and transforms). The pre-defined data ingest template is modified by adding Kafka, S3, HDFS, and FTP as shown in the below screenshot:select

Get, Consume, and Fetch processors are used to ingest the data. The Get and Consume versions of Kafka processors in NiFi is as follows:

  • GetKafka 1.3.0: Fetches messages from the earlier version of Apache Kafka (specifically 0.8.x versions). The complementary NiFi processor used to send messages is PutKafka.

  • ConsumeKafka_0_10 1.3.0: Consumes messages from the newer version of Apache Kafka specifically built against the Kafka 0.10.x Consumer API.

Based on need, a custom processor or other custom extension for NiFi can be written and packaged as a NAR file and deployed into NiFi.

Customizing Data Ingest Pipeline Template

Upon updating and saving the data ingest template in NiFi, the same template can be customized in Kylo UI. The customization steps involve:

  • Customizing feed destination table
  • Adding input properties
  • Adding additional properties
  • Performing access control
  • Registering the template


Defining Categories for Feeds

All the feeds created in Kylo should be categorized. The process group in NiFi is launched to execute the feeds. The “Transaction raw data” category is created to categorize the

Defining Feeds With Source and Destination

Kylo UI is self-explanatory to create and schedule feeds. To define feeds, perform the following:

  • Choose the data ingest template.
  • Provide feed name, category, and description.


  • Choose input Data Source to ingest data.
  • Customize the configuration parameter related to that source; for example, transactionRawTopic in Kafka and batch size 10000.


  • Define output feed table using either of the following methods:
    • Manually define the table columns and its data type.
    • Upload the sample file and update the data type as per the data in the column.
  • Preview the data under Feed Details section in the top right corner:


  • Define partitioning output table by choosing Source Field and Partition Formula, for example, time as source field and year as partition formula to partition the data.


Cleansing and Validating Data

The feed creation wizard UI allows end-users to configure cleansing and standardization functions to manipulate data into conventional or canonical formats (for example, simple data type conversion such as dates or stripping special characters) or data protection (for example, masking credit cards, PII, and so on).

It allows users to define field-level validation to protect data against quality issues and provides schema validation automatically. It provides an extensible Java API to develop custom validation, custom cleansing, and standardization routines as per needs. It provides predefined rules for standardization and validation of different data

To clean and validate data, perform the following:

  • Apply different predefined standardization rules for time, user, address, and amount columns as shown below:


  • Apply standardization and validation for different columns as shown in the below screenshot:


  • Define the data ingestion merge strategy in the output table.
  • Choose Dedupe and merge to ignore duplicated batch data and insert it into the desired output table.


  • Use the Target Format section to define data storage and compression options.
    • Supported storage formats: ORC, Parquet, Avro, TextFile, and RCFile
    • Compression options: Snappy and Zlib


Scheduling Feeds

To schedule the feeds using a cron or timer-based mechanism, enable the Enable feed immediately option to enable the feeds immediately without waiting for a cron job or timer criteria

Monitoring Feeds

After scheduling the feeds, the actual execution will be performed in NiFi. Feeds status can be edited and monitored. The feed details can be changed at any time and the feeds can be re-scheduled.


An overview of created feed job status can be seen under jobs in the Operation sections. By drilling down the jobs, you can identify the details of each job and perform debugging upon feed job execution failure.


The Job Activity section provides details such as completed, running, and so on of a specific feed recurring activity.


The Operational Job Statistics section provides details such as success rate, flow rate per second, flow duration, and steps duration of specific job statistics.



In this blog, we discussed data ingestion, cleansing, and validation without any coding in the Kylo data lake platform. The ingested data output from Kafka is shown in a Hive table in Ambari looks as follows:select

In my next blog, we’ll discuss data profiling and search-based data discovery.



Original Link

What’s New in Apache Spark 2.2?

Apache recently released a new version of Spark: 2.2. The newest version comes with improvements as well as the addition of new functionalities.

The major addition to this release is structured streaming. It has been marked as production-ready and its experimental tag has been removed.

As a whole, some of the high-level changes and improvements are:

  • Production-ready structured streaming
  • Expanding SQL functionalities
  • New distributed machine learning algorithms in R
  • Additional algorithms in MLlib and GraphX

The production-ready structured streaming comes with additional high-level changes:

  • Kafka source and sink: In the previous Spark version, Kafka was supported only as a source. But in the current release, we can use Kafka both as a source and as a sink.
  • Kafka improvements: Now, a cached instance of a Kafka producer will be used for writing to Kafka sinks, reducing latency.
  • Additional stateful APIs: Support for complex stateful processing and timeouts using [flat]MapGroupsWithState. 
  • Run once triggers: Allows for triggering only one-time execution, lowering the cost of clusters.

Spark 2.2 adds a number of SQL functionalities:

  • API updates: Added support for creating Hive tables with DataFrameWriter and CatalogLATERAL VIEW OUTER explode(), and unify CREATE TABLE syntax for data sources and Hive serde tables. Added broadcast hints including BROADCASTBROADCASTJOIN, and MAPJOIN for SQL queries, as well as support sessions for local timezones when machines or users are in different timezones. It also adds support for ADD COLUMNS with the ALTER TABLE command.
  • Overall performance and stability:
    • Cost-based optimizer: Cardinality estimation for filter, join, aggregate, project, and limit/sample operators. It decides the join order of a multi-way join query based on the cost function. TPC-DS performance improvements using star-schema heuristics.
    • File listing/IO improvements for CSV and JSON.
    • Partial aggregation support of HiveUDAFFunction.
    • Introduce a JVM object-based aggregate operator.
    • Limiting the max number of records written per file.
  • Other notable changes:
    • Support for parsing multiline JSON and CSV files.
    • Analyze table commands on partitioned tables.
    • Drop staging directories and data files after completion of insertions/CTAs against Hive serde tables.
    • More robust view canonicalization without full SQL expansion.
    • Support reading data from Hive metastore 2.0/2.1.
    • Removed support for Hadoop 2.5 and earlier.
    • Removed Java 7 support.

A major set of changes in Spark 2.2 focuses on advanced analytics and Python. PySpark from PyPI can be installed using pip install.

A few new algorithms were also added to MLlib and GraphX:

  • Locality sensitive hashing
  • Multiclass logistic regression
  • Personalized PageRank

Spark 2.2 also adds support for the following distributed algorithms in SparkR:

  • ALS
  • Isotonic regression
  • Multilayer perceptron classifier
  • Random forest
  • Gaussian mixture model
  • LDA
  • Multiclass logistic regression
  • Gradient boosted trees

The main focus of SparkR in the 2.2.0 release was adding extensive support for existing Spark SQL features:

  • Structured Streaming API for R
  • Support complete catalog API in R
  • Column functions to_json and from_json
  • Coalesce on DataFrame and coalesce on column
  • Support DataFrame checkpointing
  • Multi-column approxQuantile in R

Some of the features like support for Python 2.6 have been dropped and features like createExternalTable have been deprecated.

Happy learning!

Original Link

How Neo4j Is Making Graph Technology More Accessible

Thanks to Jeff Morris, Head of Product Marketing at Neo4j, for taking me through two new additions that are making graph technology more accessible for developers, big data IT, and data scientists. The graph database ecosystem is evolving quickly as business owners express an interest in better data presentations.

The new contribution to the Hadoop ecosystem enables graph analytic capabilities for Spark, making Cypher available to the popular in-memory analytic engine.

The company has released the preview version of Cypher for Apache Spark (CAPS) language toolkit. This combination allows big data analysts to incorporate graphs and graph algorithms into their work, which will broaden how they reveal connections in their data. Spark joins Neo4j, SAP HANA, Redis, and AgensGraph, among others in supporting Cypher, the world’s leading declarative graph query language, as the openCypher initiative expands its reach.

As graph-powered applications and analytic projects gain success, big data teams are looking to connect more of their data and personnel into this work. This is happening at places like eBay (recommendations via conversational commerce), Telia (smart home), and Comcast (smart home content recommendations). Until now, graph pattern matching has been unavailable to data scientists using Spark. Now, with Cypher for Apache Spark, these scientists can iterate easier and connect adjacent data sources to their graph applications much more quickly.

“Cypher for Apache Spark is an important milestone in both the pervasiveness of graph technology and in the evolution of the Cypher query language itself,” explains Philip Rathle, VP of product at Neo4j. “In making Cypher available for Apache Spark, we looked closely at the way data scientists work with Spark, and then in coordination with the openCypher group, used the latest features in the language to enable patterns of Cypher querying that would be most suitable for Apache Spark users. Cypher for Apache Spark enables full composability language: enabling it to not only return tables of data but also return graphs themselves as a result of queries. This allows data scientists to chain queries together with in-memory Spark-based graph representations between steps. This capability lets Spark users carry out sophisticated graph analytics much more easily, directly within their Hadoop environment.”

Neo4j is releasing Cypher for Apache Spark under the Apache 2.0 license, in order to unite Cypher with the broadest community of big data analysts, data scientists and IT architects so they, too can experience the transformative influence of connected data.

“As data accumulates in lakes at accelerating speeds and in unprecedented volumes, the challenge of extracting value from it by traversing differentiated structures and inferring context from them grows exponentially,” says Stephen O’Grady, analyst and co-founder at RedMonk. “Neo4j and its Cypher graph query language intend to be the de facto solution to precisely this problem.”

With the widespread popularity of graph databases, Neo4j moves up the stack with advanced analytics for artificial intelligence applications and powerful visualization for non-technical users.

Neo4j has also unveiled its new Native Graph Platform. The platform adds analytics, data import and transformation, visualization, and discovery, all on top of Neo4j’s cross-industry graph database. This new offering expands Neo4j’s enterprise footprint by establishing relationships with a variety of new users and roles, including data scientists, big data IT, business analysts and line of business managers.

Whether for increased revenue, fraud detection or planning for a more connected future, building networks of connected data proves to be the single biggest competitive advantage for companies today. This will become even more evident in the future as machine learning, intelligent devices, and real-time activities like conversational commerce are all dependent on connections. This is why Neo4j is extending the reach of its native graph stack, which has already seen success across multiple use cases with organizations ranging from NASA to eBay to Comcast, to link together a broader set of users, functionality, and technologies.

“Our customers’ needs have changed. Many companies started with us for retail recommendation engines or fraud detection, but now they need to drive their next generation of connected-data to power complex artificial intelligence applications,” says Emil Eifrem, CEO of Neo4j. “Our customers not only need a high performance, scalable graph database, they need algorithms to feed it, they need visualization tools to illustrate it, they need data integration to dive deeply into their data lakes. Our connections-first approach, facilitated by this new Native Graph Platform, makes it possible for our customers, like NASA, ICIJ, Comcast, Telia, and eBay to reach for the stars. And that’s what today’s GraphConnect is all about.”

The Native Graph Platform

Neo4j is making data connections more accessible, effective, and actionable for organizations. Its new Native Graph Platform takes a connection-first approach to query, visualize, and analyze data, making it more meaningful, more useful, and more easily adopted.

The Native Graph Platform introduces new features and products to serve a broader range of individuals, including:

  • Performance boost: For new or existing users, the Native Graph Platform introduces a performance boost of the Neo4j 3.3 Database, which has expanded its use of native indexing, re-factored the Cypher query interpreter, and sped up write and update performance by as much as 55 percent over version 3.2 and 346% over version 2.3.

  • Server-to-server encryption: Data center and cloud administrators will rest easy knowing that Neo4j 3.3 Enterprise Edition clusters support intra-server encryption for all operations, across geographies and cloud zones.

  • Neo4j ETL: Data lake architects in IT will enjoy how fast it has become to prepare and import data into the graph platform using Neo4j ETL, which not only reveals data connections but also materializes these connections across a variety of relational sources and raw data formats living in Hadoop or other systems.

  • Advanced analytics for artificial intelligence: Allows data scientists to use Neo4j’s graph algorithms in developing AI logic for forward-looking projects, while they can also use Cypher on Apache Spark as a means to traverse gargantuan data volumes as graphs.

  • Integration with graph discovery and visualization applications: Allows business users to visualize, understand, analyze, and explore their graph data via a variety of industry-leading partners.

  • Neo4j Mission Control package: A developer and user’s launch pad for connecting to, exploring, and developing with a local copy of Neo4j Enterprise Edition and its associated platform components like APOC and algorithm libraries.

Original Link

Making Apache Spark the Most Versatile and Fast Data Platform Ever

We released SnappyData 1.0 GA last week. We couldn’t be more proud of our team for their effort, dedication, and commitment. It has been a long, enduring test for most in the team, having worked on distributed in-memory computing for so many years and going back to our days in VMWare-Pivotal. Just in the last year, the team has closed about 1,000 JIRAs and improved performance 5-10 fold, while supporting several customers and the community. The project roughly added 200K source lines and another 70K lines of test code.

If you have been following SnappyData closely, here is the list of improvements since the 0.9 release. 

In this post, I will focus on the broader audience still trying to grasp SnappyData and its positioning with respects to Spark and other fast data systems. I am hoping you will be intrigued enough to give SnappyData a try and star our Git repository.

While Apache Spark is a general-purpose engine for both real-time and batch big data processing, its core is designed for high throughput and batch processing. It cannot process events at a time, do point reads/writes, or manage mutable state. Spark has to rely on an external store to share data and get low-latency access, updates, and high concurrency. And when coupled with external stores, its in-memory performance is heavily impacted by the frequent data transfers required from such stores into Spark’s memory. The schematic below captures this challenge.

Figure 1: Spark’s runtime architecture.

Figure 2: Spark enhanced with hybrid (row + column) store.

SnappyData adds mutability with transactional consistency into Spark, permits data sharing across applications, and allows for a mix of low-latency operations (such as a KV read/write operations) with high-latency ones (an expensive aggregation query or ML training job).

SnappyData introduces hybrid in-memory data management into Spark. SnappyData’s Spark++ clusters can now analyze streams as well as manage transactional and historical states for advanced insight using Spark as its programming model. The single cluster strategy for compute and data provides the best possible performance while avoiding expensive stitching of complex distributed systems (the norm today).

The schematic above depicts this fusion of Spark with a hybrid in-memory DB. Applications can submit Spark jobs to be executed inside the cluster achieving up to 3 orders of magnitude performance gains, compared to a design where Spark is purely a computational layer. SnappyData also provides a very high-performance connector for Spark so any Spark cluster can connect to SnappyData as a store. In this “smart connector” mode, unlike other connectors, SnappyData uses the same columnar format as Spark for data movement along with several other optimizations making it significantly faster than every store we have tried in the market today.

The details of how SnappyData is architected is described in this CIDR paper. The schematic below captures its key components and the ecosystem.

Figure 3: SnappyData core, Spark Facade, and eco-system. The components in the center and highlighted in light green are some of the SnappyData extensions into the Spark runtime.

Deep Insights on Live Data

We have seen a flurry of stream processing engines (Google Data FlowFlinkStorm, and IBM Infosphere, to name a few) all aimed at capturing perishable insights — insights obtained as events that, if not acted upon immediately, lose their value.

They all support programming constructs that can use either custom application logic or SQL to detect a condition or pattern within the stream. For instance, finding the most popular products selling now, top-K URLs, et.c can all be continuously captured as KPIs and made accessible to any application. For deeper insight, you often need to correlate current patterns to historical patterns or relate current patterns to other contextual datasets (Are sudden changes in temperature and pressure correlated to previously known patterns in a manufacturing device? Did the environment play a role?). Such insight, once presented to users, solicits other, deeper questions.

Often, this requires large-scale distributed joins, integration with disparate data sources, evaluating on the incremental training of ML/DL models, and even permitting instant visualization/data exploration tools that pose ad-hoc questions on all of this data. While some of the existing tools would permit joining a stream to related datasets, what we find is that these related datasets are managed in enterprise repositories that are themselves large, diverse (NoSQL, SQL, text, etc.) and also constantly changing. Imagine a telco operator placing location sensitive offers/ads on mobile devices that require offers and subscriber profiles from CRM systems or from partner systems. You have to combine a high-velocity CDR (call data record) stream with live data sets that reside in CRM systems.

Trying to execute a real-time join with CRM systems in real time is not possible. What you need is an engine that supports replicating changes in the CRM system into a system that also manages the stream state (CDC). Moreover, this dependent state itself can be large. Most of the current solutions fall short. Streaming systems focus on how to manage streams and offer primitive state management.

True analytics on live data requires a different design center that can consume any “live” data set in the enterprise, not just incoming streams.

SnappyData aims to offer true deep insight on any live data — event streams (sensor streams), trapping continuous changes in enterprise databases (MySQL, Cassandra, etc.), historical data in-memory, or even datasets in remote sources. For instance, you can run a continuous or interactive query that combines windowed streams, reference data, and even large datasets in S3/HDFS. You can even use probabilistic data structures to condense extremely large datasets into main-memory and make instant decisions using approximate query processing. The SnappyData design center is more like a highly scalable MPP database that runs in-memory and offers streaming support. 

Figure 4 shows what the current state-of-the-art is for a streaming system. Figure 5 depicts what a SnappyData-enabled system might look like.

Figure 4: Challenging to run complex analytics with streaming systems.

Figure 5: SnappyData’s architecture for live analytics

Don’t All Modern Business Intelligence Tools Support Live Analytics?

While there are several BI tools in the market that support live analytics by connecting directly to the source, most don’t scale or perform. The prolific response in the BI tools community has been to pre-aggregate or generate multi-dimensional cubes, cache these in-memory, and allow the BI visualizations to be driven from this cache. Unfortunately, this doesn’t work for two reasons:

  1. These caches are read-only, take time to construct, and don’t provide access to the live data we expect.
  2. Increasingly, analytics requires working with many data sources, fluid NoSQL data, and too many dimensions. It is far too complex and time-consuming to generate multi-dimensional cubes.

Figure 6 captures the challenges in existing tools for business intelligence.

Figure 6: Challenges in business intelligence tools.

SnappyData manages data in distributed memory, offering extreme performance through columnar data management, code generation, vectorization, and statistical data structures. And it natively supports all the Spark data types: nested objects, JSON, text, and of course, structured Spark SQL types.

The net effect is to enable access to live data in streams, tables, and external data sources to any modern BI tool. Our goal is to offer interactive analytic performance even for live big data across many concurrent users.

Why Not Spark Itself?

If you are Spark-versed, you might be wondering why this isn’t necessarily possible in Spark. All in-memory state in Spark is immutable, thereby requiring applications to relegate mutating state to external data stores like Cassandra. All analytical queries require repeated copying and even complex deserialization, making analytical queries very expensive to execute. Furthermore, all queries in Spark are scheduled as jobs, often consuming all available CPU executing one query at a time and hence offering low concurrency in query processing. In analytics, workloads are often a mix of expensive scans/aggregations or drill-down questions that look for pinpointed datasets. Unlike Spark, SnappyData distinguishes low-latency queries from high-latency ones and ensures that application requests are handled with high concurrency.

Working With Heterogenous Data

Live data often arrives in a range of formats, such as text, XML, JSON, custom objects in streams. Data can be self-describing, nested, composed as a graph, and not compliant to a pre-defined schema.

SnappyData capitalizes on Spark’s ability to connect to virtually any data source and infer its schema. Here is a code snippet to read a collection of JSON documents from MongoDB and store in memory as a column table in SnappyData. Note that there was no need to specify the schema for the table. Internally, each JSON document in the collection is introspected for its schema. All these individual schema structures are merged to produce the final schema for the table.

val mongoDataFrame = snappySparkSession.loadFromMongoDB(ReadConfig( Map("uri" -> "mongodb://") )
mongoDataFrame.printSchema //The schema is automatically inferred by sampling documents
snappySparkSession.sql("select avg(product.retailPrice - discount) from aColumnTable")
// Run queries on nested JSON objects 

SnappyData applications can connect, transform, and import/export data from S3, CSV, Parquet, Hadoop, NoSQL stores like Cassandra, Elastic, and HBase, all relational databases, object data grids, and more. Essentially, the data model in SnappyData is the same as Spark.

Speed Matter

It is difficult to find big data stores that don’t claim interactive speeds. The common myth is that if data is managed in memory combined with enough CPU (and even GPU), queries will execute at interactive speeds. Practitioners will note that this is often not the case. For instance, you may be surprised to find that Spark executes queries faster on parquet files than on its in-memory cache (see charts below). This has much to do with the layout of the data in-memory and if the code is written keeping in mind modern-day multi-core CPU architectures.  

SnappyData heavily leverages columnar storage in-memory, using row storage when appropriate, co-locates related data sets to avoid shuffles and a range of other optimizations as described in an earlier blog.

Here are the results from a simple performance benchmark on a MacBook Oro (4 cores) comparing Spark caching, Spark+Cassandra, and Spark + Kudu. We measure the load times for 100 million records as well as the time for both analytic and point queries. We use a single table with two columns (ID and symbol). We made sure all comparisons were fair and went through basic documented tuning options.

Figure 7: Note that lower is better.

Figure 8

Figure 9: SnappyData and Spark did not define ID as a key in the above test.

I encourage you to download and try a quick performance test for yourself.


  1. SnappyData Git repository  
  2. Try SnappyData now
  3. A more technical white paper
  4. SnappyData documentation
  5. Ask questions now – SnappyData community
  6. Even more resources

Original Link

IoT With Amazon Kinesis and Spark Streaming on Qubole

The Internet of Things (IoT) is increasingly becoming an important topic in the world of application development. This is because these devices are constantly sending a high velocity of data that needs to be processed and analyzed. Amazon Kinesis and Amazon IoT are a perfect pair for receiving and analyzing this data and Spark Streaming can be used to process the data as it arrives.

In this blog post, we will look at Kinesis, Apache Spark, Amazon IoT, and Qubole to build a streaming pipeline. Amazon IoT and Kinesis are services that can be provisioned easily on AWS and for Spark streaming, we will use the Qubole platform. Qubole offers a greatly enhanced, easy-to-use, and cloud-optimized Spark as a service for running Spark applications on AWS.

Sample IoT Dataset

Amazon provides an IoT data generator called Simple Beer Simulator (SBS) that generates random JSON dataset from a simulated IoT device connected to a beer dispenser. Sample data includes temperature, humidity, and flow rate. See below.

{"deviceParameter": "Sound", "deviceValue": 109, "deviceId": "SBS03", "dateTime": "2017-08-19 23:57:26"}
{"deviceParameter": "Temperature", "deviceValue": 35, "deviceId": "SBS04", "dateTime": "2017-08-19 23:57:27"}
{"deviceParameter": "Temperature", "deviceValue": 23, "deviceId": "SBS03", "dateTime": "2017-08-19 23:57:28"}
{"deviceParameter": "Humidity", "deviceValue": 86, "deviceId": "SBS01", "dateTime": "2017-08-19 23:57:29"}

This sample data will be streamed into Amazon IoT and passed via a rule to Kinesis.

Creating the Kinesis Stream

Log into your AWS console, navigate to Kinesis and create a stream called iot-stream.

Note: One shard is good for this example because we won’t be stressing the application with a large volume of devices and data. In a real-world scenario, increasing the number of shards in a Kinesis stream will improve application scalability.

Create an IoT Rule

Log into your AWS console, navigate to IoT, and create a new rule as follows.


  • Topic filter: /sbs/devicedata/#

  • Attribute: *

Create an IoT Action

Navigate to IoT and create a new action as follows.

Select Kinesis as a destination for your messages. On the next screen, you will need to create a rule to publish to Kinesis.

Click Create a new role to automatically create an IAM role with the correct policies. Click through to complete creating the rule with all the defaults. If you are using an existing role, you may want to click the update role button. This will add the correct Kinesis stream to the role policy.

Create IAM User

In order for the SBS to be able to publish messages to Amazon IoT, it uses boto3 and therefore requires permission to the appropriate resources. Create a user with AWSIoTFullAccess and generate an access key and secret. In the SBS directory, there is a credentials file that should be updated with your access key and secret.


Build the docker container for SBS:

docker build -t sbs .

Run the docker container:

docker run -ti sbs

At this point, you should have data being sent to Kinesis via Amazon IoT.

Spark Streaming

The Scala app reads data from Kinesis and saves the result to a CSV file. You will need to create a user that has access to read off of the Kinesis stream. This credential will be different than the one used for the SBS. Note: For this sample application, we’re using a key that has admin access to everything in the account. In a real-world scenario, you should restrict this key to only being able to read the iot-stream.

val awsAccessKeyId = "your access key"

Define a case class to use as a holder for the JSON data we receive from Kinesis.

case class Beer(deviceParameter:String, deviceValue:Int, deviceId:String,dateTime:String);

Connect to the Kinesis stream:

val kinesisStream = KinesisUtils.createStream(ssc,kinesisAppName, kinesisStreamName,
kinesisEndpointUrl, RegionUtils.getRegionMetadata.getRegionByEndpoint(kinesisEndpointUrl).getName(),
StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey) iot.foreachRDD { rdd => val sqlContext = new SQLContext(SparkContext.getOrCreate()) import sqlContext.implicits._ val jobs = => { implicit val formats = DefaultFormats val parsedJson = parse(jstr) val j = parsedJson.extract[Beer] j }) //output the rdd to csv jobs.toDF().write.mode(SaveMode.Append).csv("s3://your-bucket/streamingsbs")

The complete code can be found here.

Build a fat JAR using SBT:

sbt assembly

Copy the JAR to S3 using the AWS CLI or using the tool of your preference.

aws s3 cp target/scala-2.11/MyProject-assembly-0.1.jar s3:\your-bucket\apps\MyProject-assembly-0.1.jar

Spark on Qubole

In order to run this application, we will use Spark running on Qubole. Run the following command in the Analyze interface.

spark-submit --class example.SBSStreamingReader --master local[8] s3:\your-bucket\apps\MyProject-assembly-0.1.jar

Let the job run for a while and you should see the data being written to the S3 directory specified in the streaming class. Note: Qubole will continue to run this Spark streaming job for 36 hours or until you kill it. Alternatively, what’s illustrated here can be achieved with Kinesis Firehose, but this post shows you the use of Apache Spark with Kinesis.

Why Spark on Qubole?

Qubole offers a greatly enhanced and optimized Spark as a service. It makes for a perfect deployment platform.

  • Auto-scaling Spark clusters

    • In the open-source version of auto-scaling in Apache Spark, the required number of executors for completing a task are added in multiples of two. In Qubole, we’ve enhanced the auto-scaling feature to add the required number of executors based on configurable SLA.

    • With Qubole’s auto-scaling, cluster utilization is matched precisely to the workloads, so there are no wasted compute resources, which also leads to lowered TCO. Based on our benchmark on performance and cost savings, we estimate that auto-scaling saves a Qubole’s customer over $300K per year for just one cluster.

  • Heterogeneous Spark clusters on AWS

    • Qubole supports heterogeneous Spark clusters for both on-demand and spot instances on AWS. This means that the slave nodes in Spark clusters may be of any instance type.

    • For on-demand nodes, this is beneficial in scenarios when the requested number of primary instance type nodes are not granted by AWS at the time of the request. For spot nodes, it’s advantageous when either the spot price of primary slave type is higher than the spot price specified in the cluster configuration or the requested number of spot nodes are not granted by AWS at the time of the request. For more details, click here.

  • Optimized split computation for Spark SQL

    • We’ve implemented optimization with regards to AWS S3 listings which enables split computations to run significantly faster on Spark SQL queries. As a result, we’ve recorded up to 6X and 81X improvements on query execution and AWS S3 listings respectively. For more details, click here.

To learn how you can use Qubole for various workload types, click here.

Stay Tuned!

In the next post, you will learn how to visualize this data in real time using a Spark notebook running on Qubole.

Original Link

Comparison API for Apache Kafka

With the demand for processing large amounts of data, Apache Kafka is a standard message queue in the big data world. Apache Kafka is publish-subscribe-messaging rethought as a distributed, partitioned, replicated, commit log service, and it has a lot of convenient APIs for many languages.

In this article, I would like to share my experience with leveraging Kafka’s API for multiple purposes — from consuming and writing data to streams to a more reactive approach with Akka. In this tutorial, all examples are written in Scala. If you use another programming language, you can easy remake code from Scala.

First of all, you need to install Kafka. For this, I use a Docker image:

docker run --rm -it \ -p 2181:2181 -p 3030:3030 -p 7081:8081 \ -p 7082:8082 -p 7083:8083 -p 9092:9092 \ -e ADV_HOST= \ landoop/fast-data-dev

Of course, you can use another image or launch Kafka manually; it’s up to you.

Integrating Spark Streaming and Kafka is incredibly easy. Your middleware, backend (proxy-like), or IoT devices can send millions of records per second to Kafka while it effectively handling them. Spark Streaming provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. Primarily, we need to set up Kafka’s parameters to Spark — like a host, port, offset committing strategy, etc.

 def kafkaParams = Map[String, Object]( "bootstrap.servers" -> "", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "" -> "mygroup1", "auto.offset.reset" -> "latest", "" -> (false: java.lang.Boolean) )

After setting the necessary configurations, we can deal with the direct stream. All logic with creating streams is located in the KafkaUtils class: 

 val topics = Array("sample_topic") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, //It will consistently distribute partitions across all executors. Subscribe[String, String](topics, kafkaParams) ) => (record.key, record.value)).print() stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { _ => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } }

Note: The code above uses the Spark Streaming API, which we will discuss below.

Spark operates with RDD (the basic abstraction in Spark; represents an immutable, partitioned collection of elements that can be operated in parallel). All RDD in a specific batch (represented by parameter can be manipulated with the foreachRDD method. In this example, we simply print names of the topic, offset, and partition. You can also use more complicated logic, like retrieving data from a stream of tweets.

Kafka provides three different ways to warranty fault tolerance behavior. The first is checkpointing. Spark specification says:

“A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed.”

In code, it looks as follows:

def kafkaStream010Checkpointing() = launchWithCheckpointing(kafkaStreaming010, appName = "Kafka010_DirectStream", checkpointPath = "checkpointing")

I call the second strategy the Kafka itself strategy. Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. After output from Kafka is consumed by the streaming, you can commit offsets to Kafka using the commitAsync API. Kafka is not transactional, so your outputs must still be idempotent. In code it looks as follows:

 def storingOffsetsItself(stream: InputDStream[ConsumerRecord[String, String]]) = { stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }

And the last strategy is using its own data store. Yes, you can use storage like RDBMS or ZooKeeper for storing offsets — this is a very popular solution. It gives the equivalent of exactly-once semantics. Applying this strategy is especially useful in situations when it’s hard to make idempotent logic with complicated aggregation:

val fromOffsets = { resultSet => new TopicPartition(resultSet.string("topic"),"partition")) -> resultSet.long("offset")

If you want to read data between certain offsets, you can simply obtain RDDs that represent content in this range in the topic:

val offsetRanges = Array( // topic, partition, inclusive starting offset, exclusive ending offset OffsetRange("sample_topic", 0, 10, 20), OffsetRange("sample_topic", 1, 10, 20) ) val params = new ju.HashMap[String, Object](kafkaParams) val kafkaRDD = KafkaUtils.createRDD[String, String](sparkContext, params , offsetRanges, PreferConsistent) println(

Pay the attention to the different ways to acquire createStream and createDirectStream. You can read more about the differences between them here. This is an important concept; you must distinguish use cases for them.

Also, Kafka provides seamless integration with binary protocols like Avro and Protobuff. Integration of Apache Spark with Kafka and Avro can be organized in a separate module, so include it as on-demand (usage of Tweeter’s bijection simplifies code with transforming):

 @transient lazy implicit private val avroSpecificInjection = SpecificAvroCodecs.toBinary[A] def decodeValue(payload: KafkaPayload): Option[A] = { val decodedTry = Injection.invert[A, Array[Byte]](payload.value) decodedTry match { case Success(record) => Some(record) case Failure(ex) => logger.warn("Could not decode payload", ex) None } } def encodeValue(value: A): KafkaPayload = { val encoded = Injection[A, Array[Byte]](value) KafkaPayload(None, encoded) }

Spark Structured Streaming API

Spark structured streaming is one of the most exciting ideas presented in Apache Spark. It’s the next step in the process of developing Spark Streaming. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. Structured streaming gives the ability to build ETL pipelines in a very clear way. As a result, the code is similar to the Java 8 Stream API, so if you don’t know Scala but know Java, it will not be difficult for you to understand what is happening:

val sparkSession = SparkSession .builder .master("local") .appName("kafka") .getOrCreate() sparkSession.sparkContext.setLogLevel("ERROR") import sparkSession.implicits._ val kafkaDF = sparkSession .readStream .format("kafka") .option("kafka.bootstrap.servers", "") .option("subscribe", "structured_topic") .load() val data: Dataset[(String, String)] = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] kafkaDF.printSchema() data.writeStream .outputMode("append") .format("console") .start() .awaitTermination()

Writing Data to Kafka Stream

There is a third-party library for this not-so-standard task. This tool provides simple API for writing data to the stream. The next example shows how to read data from the socket and write it to the stream:

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9087) lines.writeToKafka(numbersProducerConfig, s => new ProducerRecord[String, String](topic, "key " + s , s.toString) ) ssc.start() ssc.awaitTermination()

Akka Streams

Akka Streams Kafka, also known as Reactive Kafka, is an Akka Streams connector to Apache Kafka. Akka Streams allows you to write data to Kafka topics via a Sink API:

val done = Source(50 to 60) .map(_.toString) .map { elem => println(s"PlainSinkProducer produce: $elem") new ProducerRecord[Array[Byte], String](topic, elem) }.runWith(Producer.plainSink(producerSettings))

And exactly the same via a Flow API:

val done = Source(100 to 111) .map { n => val partition = 1 ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( topic , partition, null, n.toString ), n) } .via(Producer.flow(producerSettings)) .map { result => val record = result.message.record println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" + s"(${result.message.passThrough})") result } .runWith(Sink.ignore)

Consuming data with Akka Streams is very clear, you can build sophisticated data flows with Graph DSL where Kafka will part of it:

 val done = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1)) .map { msg => println(s"topic1 -> topic2: $msg") ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( topic2, msg.record.value ), msg.committableOffset) } .via(Producer.flow(producerSettings)) .mapAsync(producerSettings.parallelism) { result => result.message.passThrough.commitScaladsl() } .runWith(Sink.ignore)

Like in the example with Apache Spark, you can save offset in a database or in ZooKeeper:

private val offset = new AtomicLong(2) def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = { println(s" ${record.value}") offset.set(record.offset) Future.successful(Done) } def loadOffset(): Future[Long] = Future.successful(offset.get) def update(data: String): Future[Done] = { println(s"DB.update: $data") Future.successful(Done) }

Akka Actors

Akka gives you the opportunity to make logic for producing/consuming messages from Kafka with the Actor model. It’s very convenient if actors are widely used in your code and it significantly simplifies making data pipelines with actors. For example, you have your Akka Cluster, one part of which allows you to crawl of web pages and the other part of which makes it possible to index and send indexed data to Kafka. The consumer can aggregate this logic. Producing data to Kafka looks as follows:

def actorProducer() = { val system = ActorSystem() val producer = system.actorOf(KafkaProducerActor.props(kafkaProducerConf)) val batch: Seq[ProducerRecord[String, String]] = Seq( KafkaProducerRecord(topic, "foo"), KafkaProducerRecord(topic, "key", "value"), KafkaProducerRecord(topic, "bar") ) val message = ProducerRecords(batch) producer ! message }

Consuming messages is obvious — you set a supervisor strategy for handling messages and write the logic for incoming record in the receive method:

class ConsumerRecovery(kafkaConfig: KafkaConsumer.Conf[String, String], actorConfig: KafkaConsumerActor.Conf) extends Actor with ActorLogging { override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10) { case _: KafkaConsumerActor.ConsumerException =>"Consumer exception caught. Restarting consumer.") SupervisorStrategy.Restart case _ => SupervisorStrategy.Escalate } val recordsExt: Extractor[Any, ConsumerRecords[String, String]] = ConsumerRecords.extractor[String, String] val consumer: ActorRef = context.actorOf( KafkaConsumerActor.props(kafkaConfig, actorConfig, self) ) consumer ! Subscribe.AutoPartition(List(topic)) override def receive: Receive = { // Consume from Kafka case recordsExt(records) => processRecords(records.pairs) sender() ! Confirm(records.offsets, commit = true) } private def processRecords(records: Seq[(Option[String], String)]) = records.foreach { case (key, value) =>"Received [$key,$value]") }

Since version 0.10 Kafka supports SSL/TLS, I strongly recommend you to use encryption everywhere in a production environment. The configuration of keys and certificates in multiple locations is a routine task, so I collected all necessary scripts and configuration for this task here.

That’s all. You can find a full listing of the source code on my GitHub repository.

Original Link

Apache Flink vs. Apache Spark

If you look at this image with a list of big data tools, it may seem that all possible niches in this field are already occupied. With so much competition, it should be very tough to come up with groundbreaking technology.

Apache Flink creators have a different thought about this. It started as a research project called Stratosphere. Stratosphere was forked, and this fork became what we know as Apache Flink. In 2014, it was accepted as an Apache Incubator project, and just a few months later, it became a top-level Apache project. At the time of this writing, the project has almost twelve thousand commits and more than 300 contributors.

Why is there so much attention? This is because Apache Flink was called a new generation big data processing framework and has enough innovations under its belt to replace Apache Spark and become the new de-facto tool for batch and stream processing.

Should you switch to Apache Flink? Should you stick with Apache Spark for a while? Or is Apache Flink just a new gimmick? This article will attempt to give you answers to these and other questions.

Unless you have been living under a rock for the last couple of years, you have heard about Apache Spark. It looks like every modern system that does any kind data processing is using Apache Spark in one way or another.

For a long time, Spark was the latest and greatest tool in this area. It delivered some impressive features comparing to its predecessors such as:

  • Impressive speed: It is ten times faster than Hadoop if data is processed on a disk and up to 100 times faster if data is processed in memory.
  • Simpler directed acyclic graph model: Instead of defining your data processing jobs using rigid MapReduce framework Spark allows defining a graph of tasks that can implement complex data processing algorithms
  • Stream processing: With the advent of new technologies such as the Internet of Things, it is not enough to simply to process a huge amount of data. Now, we need processing a huge amount of data as it arrives in real time. This is why Apache Spark has introduced stream processing that allows processing a potentially infinite stream of data.
  • Rich set of libraries: In addition to its core features, Apache Spark provides powerful libraries for machine learning, graph processing, and performing SQL queries.

To get a better idea of how you write applications with Apache Spark, let’s take a look at how you can implement a simple word count application that would count how many times each word was used in a text document:

// Read file
val file = sc.textFile("file/path")
val wordCount = file // Extract words from every line .flatMap(line => line.split(" ")) // Convert words to pairs .map(word => (word, 1)) // Count how many times each word was used .reduceByKey(_ + _)

If you know Scala, this code should seem straightforward and is similar to working with regular collections. First, we read a list of lines from a file located in file/path“. This file can be either a local file or a file in HDFS or S3.

Then, every line is split into a list of words using the flatMap method that simply splits a string by the space symbol. Then, to implement the word counting, we use the map method to convert every word into a pair where the first element of the pair is a word from the input text and the second element is simply a number one.

The last step simply counts how many times each word was used by summing up numbers for all pairs for the same word.

Apache Spark seems like a great and versatile tool. But what does Apache Flink brings to the table?

At first glance, there does not seem to be many differences. The architecture diagram looks very similar:

If you take a look at the code example for the word count application for Apache Flink, you would see that there is almost no difference:

val file = env.readTextFile("file/path")
val counts = file .flatMap(line => line.split(" ")) .map(word => (word, 1)) .groupBy(0) .sum(1)

Few notable differences, is that in this case we need to use the readTextFile method instead of the textFile method and that we need to use a pair of methods: groupBy and sum instead of reduceByKey.

So what is all the fuss about? Apache Flink may not have any visible differences on the outside, but it definitely has enough innovations, to become the next generation data processing tool. Here are just some of them:

  • Implements actual streaming processing: When you process a stream in Apache Spark, it treats it as many small batch problems, hence making stream processing a special case. Apache Flink, in contrast, treats batch processing as a special and does not use micro batching.
  • Better support for cyclical and iterative processing: Flink provides some additional operations that allow implementing cycles in your streaming application and algorithms that need to perform several iterations on batch data.
  • Custom memory management: Apache Flink is a Java application, but it does not rely entirely on JVM garbage collector. It implements custom memory manager that stores data to process in byte arrays. This allows reducing the load on a garbage collector and increased performance. You can read about it in this blog post.
  • Lower latency and higher throughput: Multiple tests done by third parties suggest that Apache Flink has lower latency and higher throughput than its competitors.
  • Powerful windows operators: When you need to process a stream of data in most cases you need to apply a function to a finite group of elements in a stream. For example, you may need to count how many clicks your application has received in each five-minute interval, or you may want to know what was the most popular tweet on Twitter in each ten-minute interval. While Spark supports some of these use-cases, Apache Flink provides a vastly more powerful set of operators for stream processing.
  • Implements lightweight distributed snapshots: This allows Apache Flink to provide low overhead and only-once processing guarantees in stream processing, without using micro batching as Spark does.

So, you are working on a new project, and you need to pick a software for it. What should ypi use? Spark? Flink?

Of course, there is no right or wrong answer here. If you need to do complex stream processing, then I would recommend using Apache Flink. It has better support for stream processing and some significant improvements.

If you don’t need bleeding-edge stream processing features and want to stay on the safe side, it may be better to stick with Apache Spark. It is a more mature project it has a bigger user base, more training materials, and more third-party libraries. But keep in mind that Apache Flink is closing this gap by the minute. More and more projects are choosing Apache Flink as it becomes a more mature project.

If on the other hand, you like to experiment with the latest technology, you definitely need to give Apache Flink a shot.

Does all this mean that Apache Spark is obsolete and in a couple of years we all are going to use Apache Flink? The answer may surprise you. While Flink has some impressive features, Spark is not staying the same. For example, Apache Spark introduced custom memory management in 2015 with the release of project Tungsten, and since then, it has been adding features that were first introduced by Apache Flink. The winner is not decided yet.

In the upcoming blog posts I will write more about how you can use Apache Flink for batch and stream processing, so stay tuned!

If you want to know more about Apache Flink, you can take a look at my Pluralsight course where I cover Apache Flink in more details. Here is a short preview of this course.

Original Link