ALU

data ingestion

Query PostgreSQL Data From Node.js

The API Server exposes Web services that allow connectivity to your data. Use the OData endpoint of the CData API Server to execute CRUD queries to PostgreSQL data from Node.js.

The CData API Server, when paired with the ADO.NET Provider for PostgreSQL, exposes PostgreSQL data (or data from any of 120+ other ADO.NET Providers) as an OData endpoint, which can be queried from Node.js using simple HTTP requests. This article shows how to use the API Server to request JSON-formatted PostgreSQL data in Node.js.

Original Link

Connect to Google Analytics Data in Python on Linux/UNIX

The CData ODBC Driver for Google Analytics enables you to create Python applications on Linux/UNIX machines with connectivity to Google Analytics data. Leverage the pyodbc module for ODBC in Python.

The rich ecosystem of Python modules lets you get to work quicker and integrate your systems more effectively. With the CData Linux/UNIX ODBC Driver for Google Analytics and the pyodbc module, you can easily build Google Analytics-connected Python applications. This article shows how to use the pyodbc built-in functions to connect to Google Analytics data, execute queries, and output the results.

Original Link

Upload Files With Python

Python is eating the world! You will find many passionate Python programmers and just as many critics, but there’s no denying that Python is a powerful, relevant, and constantly growing force in software development today.

Python is just a language, though, and languages can’t solve business problems like workflow, architecture, and logistics; these things are up to you, the developer! The packages you choose, the architecture you implement, and strategy you follow will all impact the success of your Python project. Let’s take a look at the logistics behind uploading a file to the cloud using Python. I’ll discuss some of the considerations every team faces when implementing a file upload and management solution and then end with a concise recipe for you to upload files with Python using Filestack’s Python SDK.

Original Link

Real-Time Log Management System

It’s really important to know what’s happening in your application. The most common example of this is being notified if anything is going wrong; from infrastructure issues like CPU and memory or disk usage, through to in-application warnings, errors, and exceptions. These events are highly important as they may directly impact your business and customers.
We know that when any event takes place with an application, there is an associated data payload and therefore real-time data. This information may be available in a number of ways, but one of the most common ways is through some sort of logging solution: system stat logs, access logs, and application logs.

For that, a real-time log management system is required. To break it down into steps —

Original Link

Evolving From Descriptive to Prescriptive Analytics: Part 4, Eating the Ugly Frogs

In our previous blog posts, we discussed gaining leadership support, acquiring data science skills, and having the tools to manage your data. With this post, we’ll discuss ensuring your data scientists are productive and happy.

What activities keep data scientists happy? A recent CrowdFlower report about data scientists says that mining data for patterns, building models for data, and refining algorithms are the three favorite tasks among data scientists. Most other tasks are not nearly as interesting. We call these other tasks the ugly frogs of data science. Only 19% of data scientists can spend most of their time doing their favorite tasks, while the others spend most of their time on tasks they loathe. Wouldn’t it be nice if the data scientists could be liberated from doing what they like least and allowed to do what they enjoy most?

Original Link

DataWorks Summit 2018 Apache NiFi Wrapup

First up, Andy LoPresto spoke about everything Apache NiFi; his slides are here. We did an awesome meetup with lots of beer, pretzels and NiFi. I ran a live demo of a Raspberry Pi analyzing pretzels. Did I mention pretzels? Germany has a lot of pretzels.   

My talk with MiniFi and Raspberry Pis is here.

Then Andy taught an awesome Crashcourse on Apache NiFi. The videos from both Berlin and San Jose are now available on YouTube.

I did a talk on Apache NiFi + IoT + Apache MXNet and it’s pretty interesting use case for open source IoT with AI.

Image title

Best Practices with Apache NiFi at Renault! I am really impressed with what they have done with their manufacturing process throughout Europe with industrial scale Apache NiFi.

Andy did a killer Birds of Feather with Apache NiFi and some Apache Beam. If you have never done a BOF, I highly recommend it. It’s nonformal and everyone gets a chance to participate with committers and users.

Image title

The next day, Andy gave a great MiniFi talk, which really set the focus for the day.

I gave a talk on Apache NiFi + Apache MXNet which focuses on how to run deep learning at the edge, on HDP nodes, on HDF nodes, with MXNet Model Server and through YARN.

There was also a great Apache Metron talk. Apache Metron does some awesome cybersecurity work so it uses Apache NiFi for some of the ingest.

Andy gave a BotNet + Apache NiFi talk, which should be of interest to any cybersecurity analyst or researcher.

Andy gave another Apache NiFi talk on Software Development Lifecycles. 

A list of all my talks are here:

GitHub Repos:

Original Link

Why We Advised Our Customer Against GraphQL

GraphQL has captured a lot of developer interest over the last year or so with many production level systems making the switch from REST type APIs to GraphQL. When one of our existing customers asked us to decide if we should switch their existing APIs over to GraphQL, the team was pretty excited to finally get to work on this new style of API calls!

However, as we evaluated the technology and assessed its compatibility with the existing system, we decided to not make the switch. Here is the case in consideration!

The Existing System

  • SQL Server DB with SPs, functions, views, and queries. SPs are non-modular in the sense they join various tables and return values specific to the calls being made.
  • Connection with the DB is using basic ADO.NET and not with EF/LINQ, etc.
  • ASP.NET WebAPI2.0 (Not .NET Core) and Java-based APIs.
  • Native Android, iOS and Web Clients. Some portion of the web clients bypasses the API and talks to the DB directly.
  • WebClients: ASP.NET and React.
  • A React Native-based app.

System Maturity

  • System has been in production for few years now and is stable.
  • As the system is in a competitive space, new features are always getting added apart from usual maintenance.

Customer Ask

  • Whether it makes sense to wrap our existing APIs into a GraphQL Endpoint.
  • For a new feature in the react app evaluate making the new .NET based APIs in GraphQL.

There is not much documentation on implementing GraphQL servers. Most tutorials etc. focus on GraphQL clients or at most JavaScript and .NET Core servers. Apollo GraphQL, which is typically used for servers and other features, like tracking, etc., is not available for .NET servers. So while GraphQL is server agnostic there was not too much available to get a kickstart for .NET Framework apart from the promise of its being possible.

We finally got started with a combination of these blogs/articles!

With a good understanding of how the system will work we then implemented a couple of APIs for the new system in GraphQL to see how it will fit in with the existing system.

Existing APIs:

For am existing call like GetChildrenInterestedInHobby(int hobbyID), if the current SP returns only childNames and IDs and not the ParentID, then a client call trying to access the Child’s parents name won’t really return any data anyway, as it has no way to resolve the parent info.

New APIs:

  • The new feature requirement could, as such, be implemented reusing a lot of the existing SPs.
  • In order to use GraphQL for these APIs, however, we could not reuse the existing SPs as GraphQL works best when we resolve one entity at a time rather than an SP which joins data across tables and returns a consolidated object. So this meant we would need to replicate the SPs and mark them for use with GraphQL.
  • Even with the use of DataLoader to optimize the DB calls we would end up with a few more DB hits than the existing approach where we make one call per API to a specific SP.
  • The new feature request was pretty small and while a good candidate to get started with GraphQL it would cause code repetition and maintenance overhead if added with existing APIs.
  • On the client-side, also, the GraphQL code would be intermixed with calling older REST APIs.

Our Suggestion to the Customer

  • While GraphQL is server agnostic and can work with any server implementation including the ability to wrap existing implementations under an endpoint, it is best utilized when the server-DB interaction is per entity level so that each entity can get resolved independently.
  • The existing system is working well and has been optimized heavily from a database point of view. Re-wiring it and re-wiring all clients just to use GraphQL is not advisable at this time.
  • The new feature which plugs into the existing system very closely is too small to add a new framework in the mix, will cause repetition of code functionality and a maintenance overhead.
  • We should look to add GraphQL with the next big subsystem which is being planned which will involve new tables, new SPs and a new set of APIs.

With that we concluded our exercise and the team is looking forward to using GraphQL in a new subsystem that is being visualized! Meanwhile we will work to put up a walkthrough of a sample GraphQL Server on the .NET framework with mutations and the works, and hopefully save a fellow .NET developer some time.

Original Link

Maintaining a Data Warehouse

In more traditional IT projects, when a successful system is tested, deployed and in daily operation, its developers can usually sit back and take a well-deserved rest as users come on-board, and leave ongoing maintenance to a small team of bug-fixers and providers of minor enhancements. At least until the start of the next major release cycle. Developers of today’s data warehouses have no such luxury.

The measure of success of a data warehouse is only partly defined by the number and satisfaction level of active users. The nature of creative decision-making support is that users are continuously discovering new business requirements, changing their mind about what data they need, and thus demanding new data elements and structures on a weekly or monthly basis. Indeed, in some cases, the demands may arrive daily!

This need for agility in regularly delivering new and updated data to the business through the data warehouse has long been recognized by vendors and practitioners in the space. Unfortunately, such agility has proven difficult to achieve in the past. Now, ongoing digitalization of business is driving ever higher demands for new and fresh data. Current—and, in my view, short-sighted—market thinking is that a data lake filled with every conceivable sort of raw, loosely managed data will address these needs. That approach may work for non-critical, externally sourced social media and Internet of Things data. However, it really doesn’t help with the legally-binding, historical, and (increasingly) real-time internally and externally sourced data currently delivered via the data warehouse.

Fortunately, the agile and automated characteristics of the Data Vault/data warehouse automation (DWA) approach described in the design, build, and operate phases discussed in earlier posts apply also to the maintenance phase. In fact, it may be argued that these characteristics are even more important in the maintenance phase than in the earlier ones of data warehouse development.

One explicit design point of the Data Vault data model is agility. A key differentiator between Hub, Link, and Satellite tables is that they have very different usage types and temporal characteristics. Such separation of concerns allows changes in both data requirements (frequent and driven by business needs) and data sources (less frequent, but often requiring deep “data archeology”) to be handled separately and more easily than in traditional designs. In effect, the data warehouse is structured according to good engineering principles, while the data marts flow with user needs. This structuring enables continuous iteration of agile updates to the warehouse, continuing through to the marts, by reducing or eliminating rework of existing tables when addressing new needs. For a high-level explanation of how this works, see Sanjay Pande’s excellent “Agile Data Warehousing Using the Data Vault Architecture”article.

The engineered components and methodology of the Data Vault approach are particularly well-suited to the application of DWA tools, as we saw in the design and build phases. However, it is in the maintain phase that the advantages of DWA become even more apparent. Widespread automation is essential for agility in the maintenance phase, because it increases developer productivity, reduces cycle times, and eliminates many types of coding errors. WhereScape Data Vault Express incorporates key elements of the Data Vault approach within the structures, templates, and methodology it provides to improve a team’s capabilities to make the most of potential automation gains.

Furthermore, WhereScape’s metadata-driven approach means that all the design and development work done in preceding iterations of data warehouse/mart development is always immediately available to the developers of a subsequent iteration. This is provided through the extensive metadata that WhereScape stores in the relational database repository and makes available directly to developers of new tables and/or population procedures. This metadata plays an active role in the development and runtime processes of the data warehouse (and marts) and is thus guaranteed to be far more consistent and up-to-date than typical separate and manually maintained metadata stores such as spreadsheets or text documents.

In addition, WhereScape automatically generates documentation, which is automatically maintained, and related diagrams, including impact analysis, track back/forward, and so on. These artifacts aid in understanding and reducing the risk of future changes to the warehouse, by allowing developers to discover and avoid possible downstream impacts of any changes being considered.

Another key factor in ensuring agility and success in the maintenance phase is the ongoing and committed involvement of business people. WhereScape’s automated, templated approach to the entire design, build, and deployment process allows business users to be involved continuously and intimately during every stage of development and maintenance of the warehouse and marts.

With maintenance, we come to the end of our journey through the land of automating warehouses, marts, lakes, and vaults of data. At each step of the way, combining the use of the Data Vault approach with data warehouse automation tools simplifies technical procedures and eases the business path to data-driven decision making. WhereScape Data Vault Express represents a further major stride toward the goal of fully agile data delivery and use throughout the business. 

Original Link

Kafka Python Tutorial for Fast Data Architecture

Fast Data Series Articles

  1. Installing Apache Mesos 1.6.0 on Ubuntu 18.04
  2. Kafka Tutorial for Fast Data Architecture
  3. Kafka Python Tutorial for Fast Data Architecture

This is the third article in my Fast Data Architecture series that walks you through implementing Bid Data using a SMACK Stack. This article builds on the others so if you have not read through those, I highly suggest you do so that you have the infrastructure you need to follow along in this tutorial.

This article will walk you though pulling website metrics from Clicky.com. I have another article where we will pull metrics from Google Analytics and publish the metrics to Apache Kafka: Kafka Python and Google Analytics.

In order to demonstrate how to analyze your big data, we will be configuring a big data pipeline that will pull site metrics from Clicky.com and push those metrics to a Kafka topic on our Kafka Cluster.

This is just one pipeline that you might want to implement in your Big Data Implementation. Website statistics can be a valuable part of your data as this can give you data about web site visitors, pages visited, etc. Combine this data with other data like social media shares when you perform your data analytics and you would be able to make some pretty neat business decisions about when is the best time for you to post site updates to social media in order to attract the most visitors. That is the main benefit of implementing big data: not necessarily the raw data itself but the knowledge you can extract from that raw data and make more informed decisions.

Image title

In this example, we will pull the ‘pages‘ statistics from the Clicky.com API and push them to the admintome-pages Kafka topic. This will give us JSON data from AdminTome’s top pages.

Clicky Web Analytics

In order to fully follow along in this article, you will need to have a website linked to Clicky.com. It’s free so why not. Register your site at clicky.com. I personally use it because it has better metrics reporting for blogs (like abandon rate) than Google Analytics gives. You will need to add some code to your page so that clicky can start collecting metrics.

After your page is sending metrics to clicky you will need to get some values in order to use the Clicky API and pull metrics from our Python application. Go to preferences for your site and you will see two numbers that we will need:

  • Site ID

  • Site key

Don’t publish these anywhere because they could give anyone access to your website data. We will need these numbers later when we connect to the API and pull our site statistics.

Preparing Kafka

First, we need to prepare our Kafka Cluster by adding a topic to our Kafka cluster that we will use to send messages to. As you can see from the diagram above, our topic in Kafka is going to be admintome-pages.

Login to the Mesos Master you ran Kafka-mesos from. If you followed the previous article, the master I used was mesos1.admintome.lab. Next, we will create the topic using the kafka-mesos.sh script:

$ cd kafka/
$ ./kafka-mesos.sh topic add admintome-pages --broker=0 --api=http://mslave2.admintome.lab:7000

Notice that the API parameter points to the Kafka scheduler we created using kafka-mesos in the last article. You can verify that you now have the correct topics:

$ ./kafka-mesos.sh topic list --api=http://mslave2.admintome.lab:7000
topics:
name: __consumer_offsets
partitions: 0:[0], 1:[0], 2:[0], 3:[0], 4:[0], 5:[0], 6:[0], 7:[0], 8:[0], 9:[0], 10:[0], 11:[0], 12:[0], 13:[0], 14:[0], 15:[0], 16:[0], 17:[0], 18:[0], 19:[0], 20:[0], 21:[0], 22:[0], 23:[0], 24:[0], 25:[0], 26:[0], 27:[0], 28:[0], 29:[0], 30:[0], 31:[0], 32:[0], 33:[0], 34:[0], 35:[0], 36:[0], 37:[0], 38:[0], 39:[0], 40:[0], 41:[0], 42:[0], 43:[0], 44:[0], 45:[0], 46:[0], 47:[0], 48:[0], 49:[0]
options: segment.bytes=104857600,cleanup.policy=compact,compression.type=producer name: admintome
partitions: 0:[0] name: admintome-pages
partitions: 0:[0]

And there is our new topic ready to go! Now it’s time to get to the fun stuff and start developing our Python application.

Now that we have Kafka ready to go we will start to develop our Kafka producer. The producer will get page metrics from the Clicky API and push those metrics in JSON form to our topic that we created earlier.

I assume that you have Python 3 installed on your system and virtualenv installed as well.

To get started we will need to setup our environment.

$ mkdir ~/Development/python/venvs
$ mkdir ~/Development/python/site-stats-intake
$ cd ~/Development/python/site-stats-intake
$ virtualenv ../venvs/intake
$ source ../venvs/intake/bin/activate
(intake) $ pip install kafka-python requests
(intake) $ pip freeze > requirements.txt

Next, we need to create our classes.

Clicky Class

We will create a new Python class called Clicky that we will use to interact with the Clicky API. Create a new file called clicky.py and add the following content:

import requests
import json class Clicky(object): def __init__(self, site_id, sitekey): self.site_id = site_id self.sitekey = sitekey self.output = "json" def get_data(self, data_type): click_api_url = "https://api.clicky.com/api/stats/4" payload = {"site_id": self.site_id, "sitekey": self.sitekey, "type": data_type, "output": self.output} response = requests.get(click_api_url, params=payload) raw_stats = response.text return raw_stats def get_pages_data(self): data = self.get_data("pages") return json.loads(data)

Save the file and exit.

In order to get our metrics, we need to send an HTTP GET request to the Clicky API URL which is

https://api.clicky.com/api/stats/4

We also need to include several parameters:

  • site_id: This is the Site ID number that we got earlier.
  • sitekey: This is the Site key number that also got earlier.
  • type: To get our top pages we set the type to ‘pages.’
  • output: We set this to “json” so that the API will return JSON data.

Finally, we call the request Python module to perform an HTTP GET to our API URL with the parameters we specified. In the get_pages_data method, we return a dict that represents our JSON data. Next, we will code our Kafka class implementation.

MyKafka Class

This class will interact with our Kafka cluster and push website metrics to our topic for us. Create a new file called mykafka.py and add the following content:

from kafka import KafkaProducer
import json class MyKafka(object): def __init__(self, kafka_brokers): self.producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=kafka_brokers ) def send_page_data(self, json_data): self.producer.send('admintome-pages', json_data)

First, we import the kafka-python library, specifically the KafkaProducer class, that will let us code a Kafka producer and publish messages to our Kafka Topic.

from kafka import KafkaProducer

We now define our MyKafka class and create the constructor for it:

class MyKafka(object): def __init__(self, kafka_brokers):

This takes an argument that represents the Kafka brokers that will be used to connect to our Kafka cluster. This an array of strings in the form of:

[ "broker:ip", "broker:ip" ]

We will use only one broker where is the one we created in the last article: mslave1.admintome.lab:31000:

[ "mslave1.admintome.lab:31000" ]

We next instantiate a new KafkaProducer object named producer. Since we will be sending data to Kafka in the form of JSON we tell the KafkaProducer to use the JSON decoder dumps to parse the data using the value_serializer parameter. We also tell it to use our brokers with the bootstrap_servers parameter.

self.producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=kafka_brokers )

Finally, we create a new method that we will use to send the messages to our admintome-pages topic:

def send_page_data(self, json_data): self.producer.send('admintome-pages', json_data)

That’s all there is to it. Now we will write our Main class that will control everything.

Main Class

Create a new file called main.py and add the following content:

from clicky import Clicky
from mykafka import MyKafka
import logging
import time
import os
from logging.config import dictConfig class Main(object): def __init__(self): if 'KAFKA_BROKERS' in os.environ: kafka_brokers = os.environ['KAFKA_BROKERS'].split(',') else: raise ValueError('KAFKA_BROKERS environment variable not set') if 'SITE_ID' in os.environ: self.site_id = os.environ['SITE_ID'] else: raise ValueError('SITE_ID environment variable not set') if 'SITEKEY' in os.environ: self.sitekey = os.environ['SITEKEY'] else: raise ValueError('SITEKEY environment variable not set') logging_config = dict( version=1, formatters={ 'f': {'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'} }, handlers={ 'h': {'class': 'logging.StreamHandler', 'formatter': 'f', 'level': logging.DEBUG} }, root={ 'handlers': ['h'], 'level': logging.DEBUG, }, ) self.logger = logging.getLogger() dictConfig(logging_config) self.logger.info("Initializing Kafka Producer") self.logger.info("KAFKA_BROKERS={0}".format(kafka_brokers)) self.mykafka = MyKafka(kafka_brokers) def init_clicky(self): self.clicky = Clicky(self.site_id, self.sitekey) self.logger.info("Clicky Stats Polling Initialized") def run(self): self.init_clicky() starttime = time.time() while True: data = self.clicky.get_pages_data() self.logger.info("Successfully polled Clicky pages data") self.mykafka.send_page_data(data) self.logger.info("Published page data to Kafka") time.sleep(300.0 - ((time.time() - starttime) % 300.0)) if __name__ == "__main__": logging.info("Initializing Clicky Stats Polling") main = Main() main.run()

The end state of this example is to build a Docker container that we will then run on Marathon. With that in mind, we don’t want to hardcode some of our sensitive information (like our clicky site id and site key) in our code. We want to be able to pull those from environment variables. If they are not set then we through an exception and exit out.

 if 'KAFKA_BROKERS' in os.environ: kafka_brokers = os.environ['KAFKA_BROKERS'].split(',') else: raise ValueError('KAFKA_BROKERS environment variable not set') if 'SITE_ID' in os.environ: self.site_id = os.environ['SITE_ID'] else: raise ValueError('SITE_ID environment variable not set') if 'SITEKEY' in os.environ: self.sitekey = os.environ['SITEKEY'] else: raise ValueError('SITEKEY environment variable not set')

We also configure logging so that we can see what is going on with our application. I have coded an infinite loop in our code that will poll clicky and push the metrics to our Kafka topic every five minutes.

 def run(self): self.init_clicky() starttime = time.time() while True: data = self.clicky.get_pages_data() self.logger.info("Successfully polled Clicky pages data") self.mykafka.send_page_data(data) self.logger.info("Published page data to Kafka") time.sleep(300.0 - ((time.time() - starttime) % 300.0))

Save the file and exit.

Running Our Application

To test that everything works you can try running the application after you set your environment variables:

(intake) $ export KAFKA_BROKERS="mslave1.admintome.lab:31000"
(intake) $ export SITE_ID="{your site id}"
(intake) $ export SITEKEY="{your sitekey}"
(intake) $ python main.py
2018-06-25 15:34:32,259 root INFO Initializing Kafka Producer
2018-06-25 15:34:32,259 root INFO KAFKA_BROKERS=['mslave1.admintome.lab:31000']
2018-06-25 15:34:32,374 root INFO Clicky Stats Polling Initialized
2018-06-25 15:34:32,754 root INFO Successfully polled Clicky pages data
2018-06-25 15:34:32,755 root INFO Published page data to Kafka

We are now sending messages to our Kafka Topic! We will build our Docker container next and deploy it to Marathon. Finally, we will wrap up by writing a test consumer that will get our messages from our topic.

I have created a GitHub repository for all the code used in this article: https://github.com/admintome/clicky-state-intake

Now that we have our application code written, we can create a Docker container so that we can deploy it to Marathon. Create a Dockerfile file in your application directory with the following contents:

FROM python:3 WORKDIR /usr/src/app COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [ "python", "./main.py" ]

Build the container

$ docker build -t {your docker hub username}site-stats-intake .

After the Docker build is completed, you will want to push it to your Docker repository that your Mesos Slaves have access to. For me, this is Docker Hub:

$ docker push -t admintome/site-stats-intake

Then log in to each of your Mesos slaves and pull the image down:

$ docker pull admintome/site-stats-intake

We are now ready to create a Marathon application deployment for our application.

Go to your Marathon GUI.

http://mesos1.admintome.lab:8080

Click on the Create Application Button. Then click the JSON mode button:

Image title

Paste in the following JSON code:

{ "id": "site-stats-intake", "cmd": null, "cpus": 1, "mem": 128, "disk": 0, "instances": 1, "container": { "docker": { "image": "admintome/site-stats-intake" }, "type": "DOCKER" }, "networks": [ { "mode": "host" } ], "env": { "KAFKA_BROKERS": "192.168.1.x:port", "SITE_ID": "{your site_id}", "SITEKEY": "{your sitekey}" }
}

Be sure to substitute the correct values for KAFKA_BROKERS, SITE_ID, and SITEKEY in the env section for your environment.

Finally, click on the Create Application button to deploy the application. After a few seconds, you should see the application is Running.

Image title

To see the logs click on the site-stats-intake application then click on the stderr link to download a text file containing the logs.

Now that we have our application deployed to Marathon we will write a short consumer that we will run on our development system to show us what messages have been received.

This will be a simple Kafka consumer that will check out the topic and display all messages on the topic. Not really useful at this point but it lets us know that our little polling application is working correctly.

Create a new file called consumer.py and add the following contents:

import sys
from kafka import KafkaConsumer consumer = KafkaConsumer('admintome-pages', bootstrap_servers="mslave1.admintome.lab:31000", auto_offset_reset='earliest') try: for message in consumer: print(message.value)
except KeyboardInterrupt: sys.exit()

Save and exit the file. This has the Kafka broker hardcoded because we simply are using it to test everything. Make sure to update the bootstrap-servers parameter with your broker name and port.

Now run the command and you should see a ton of JSON that represents your most visited pages:

(intake) $ python consumer.py
b'[{"type": "pages", "dates": [{"date": "2018-06-25", "items": [{"value": "145", "value_percent": "43.2", "title": "Kafka Tutorial for Fast Data Architecture - AdminTome Blog", "stats_url": "http://clicky.com/stats/visitors?site_id=101045340&date=2018-06-25&href=%2Fblog%2Fkafka-tutorial-for-fast-data-architecture%2F", "url": "http://www.admintome.com/blog/kafka-tutorial-for-fast-data-architecture/"},...

We now have a data pipeline that has some data that we can use. The next step will be to use that data and analyze it. In the article, we will install and configure the next part of our SMACK stack which is Apache Spark. We will also configure it analyze our data and give us something meaningful.

Original Link

How to Build a Data-Driven Culture

The importance of data has been around for centuries, dating back to the times of astronomy and scientific research. Early on, we saw famous scientists and astronomers like Charles DarwinGalileo GalileiMarie Curie, and Nikola Tesla gather data to prove hypotheses and evaluate discoveries.

Fast forward to today and the value of data continues to increase with it being the single most important factor in unfurling the truth and discovering new frontiers. In a recent EY report, 81 percent of surveyed organizations say that data should be used to analyze each business decision. By using data in their analyses, organizations are more prepared to uncover opportunities or threats and increase business performance and efficiency.

In fact, many corporations have embedded the importance of data into their core values. Amazon, for example, includes data as the backbone of many of its leadership principles. To achieve success, employees at Amazon are constantly measured against those principles and are pushed to “dive deep” into the data.

To build a data-driven culture, you must shift how your organization uses data to help meet the company’s business goals. Below are three things you should consider:

  1. Define company metrics – most successful organizations set business goals and define metrics to measure their progress and success. If a company’s goal is to improve sales revenues by 20 percent, then teams across the business should tap into its data and analyze how their department can contribute to the company’s revenue improvement. In addition to making teams accountable, these metrics also allow teams to objectively analyze how viable their business strategies are and to eliminate ambiguity.
  2. Democratize data – data has historically been in the hands of a few. If data is available beyond key stakeholders, employees are mobilized and empowered to conduct their own analysis by slicing and dicing the data. They can then contribute their findings to strengthen or invalidate their business strategies or ideas.
  3. Deliver consistent data – as organizations become larger, business processes become more complex. And analyzing the right, relevant data becomes more complicated when working with duplicative data from siloed sources and departments. By making data consistent across the organization, employees and departments have a common understanding of the data used in analyses, which leads to better cross-team collaboration

Building a data-driven culture is not easy, but with the right organization and technology in place, along with common goals across teams, you can provide the foundation needed to get you there.

Original Link

Building Real-Time Analytics Dashboards With Postgres and Citus

Citus scales out Postgres for a number of different use cases, both as a system of record and as a system of engagement. One use case we’re seeing implemented a lot these days is using the Citus database to power customer-facing real-time analytics dashboards, even when dealing with billions of events per day. Dashboards and pipelines are easy to handle when you’re at 10 GB of data; as you grow, even basic operations like a count of unique users require non-trivial engineering work to get performing well.

Citus is a good fit for these types of event dashboards because of its ability to ingest large amounts of data, perform rollups concurrently, mix raw unrolled-up data with pre-aggregated data, and support a large number of concurrent users. Adding all these capabilities together, the Citus extension to Postgres works well for end users where a data warehouse may not work nearly as well.

We’ve talked some here about various parts of building a real-time customer-facing dashboard, but today, we thought we’d go one step further and give you a guide for doing it end-to-end.

Ingesting Data

The first step is data ingestion. In any POC, we generally start with single-record inserts. This is usually the quickest item to put in place and we can start ingesting data easily at tens of thousands of events per second. We generally opt for single-row inserts as the quickest thing to set up, but before production, we move to a micro-batching approach.

With the \copy Postgres bulk loading utility, we’re able to ingest millions of events per second, if needed. \copy is fully transactional and the load is distributed across all nodes with Citus, making your Citus coordinator node less of a bottleneck that you would expect.

Most event pipelines we’ve observed have some upstream process already handling events, such as Kafka or Kinesis, which makes batching easy to put in place. As for the batching process itself, you can opt to do this either on a time basis (say, minutely batches) or every X records (this could also be every five minutes or even hourly depending on your requirements.) Your batches don’t have to be hundreds of thousands of events; even something like every 1,000 records can give you a nice performance boost, as you can have multiple \copy processes running in parallel.

Structuring Your Raw Events

Your raw event table will vary based on your use case, but there are some commonalities across most. In almost all cases, you have the time of the event and some customer identifier associated with it. Typically, there will be some categorization and details about the event. These details can be broken out as columns or could also be contained within a JSONB datatype. For this example dashboard, we’ll use the following schema:

CREATE TABLE events( id bigint, timestamp timestamp, customer_id bigint, event_type varchar, country varchar, browser varchar, device_id bigint, session_id bigint ); SELECT create_distributed_table('events','customer_id');

Rolling Up Your Data

Once you’ve got some raw data coming in, we can now start rolling up data. To do this, we’re going to first create several roll-up tables — some for five-minute intervals, some for hourly, and some for daily.

CREATE TABLE rollup_events_5min ( customer_id bigint, event_type varchar, country varchar, browser varchar, minute timestamp, event_count bigint, device_distinct_count hll, session_distinct_count hll
); CREATE UNIQUE INDEX rollup_events_5min_unique_idx ON rollup_events_5min(customer_id,event_type,country,browser,minute); SELECT create_distributed_table('rollup_events_5min','customer_id'); CREATE TABLE rollup_events_1hr ( customer_id bigint, event_type varchar, country varchar, browser varchar, hour timestamp, event_count bigint, device_distinct_count hll, session_distinct_count hll
);
CREATE UNIQUE INDEX rollup_events_1hr_unique_idx ON rollup_events_1hr(customer_id,event_type,country,browser,hour); SELECT create_distributed_table('rollup_events_1hr','customer_id');

One thing you’ll notice in our roll-up tables is the use of the HLL (HyperLogLog) data type. HyperLogLog is a sketch algorithm that allows you to do operations over unique buckets. HyperLogLog makes it easy to find intersections, unions, etc. across various buckets, making it incredibly useful for the types of reports your dashboard may generate.

In the above example, we have chosen (customer_id, event_type, country, browser, minute/hour) as the dimensions on which we evaluate metrics such as event_countdevice_distinct_count, session_distinct_count, etc. Based on your query workload and performance requirements, you can choose the dimensions that make sense for you. If needed, you can create multiple roll-up tables to serve different query types (just don’t go too crazy with a table for every dimension).

Ideally, you should choose dimensions that you get suitable compression (>5-10x) compared to the raw tables. Based on our customers’ experiences, we have, at times, seen orders of magnitude in compression after roll-ups — up to 100x or 1000x.

For our roll-up query, we’re going to do an INSERT INTO... SELECT, which will run across all the nodes in our cluster and parallelize.

Note: Our raw tables and roll-up tables are sharded on the same key. In this case, the sharding key is the customer_id, which is more or less a proxy for customer/tenant ID. Other granular columns can also be chosen as shard keys depending on use case.

To perform our roll-up, we’ll create the function:

CREATE OR REPLACE FUNCTION compute_rollups_every_5min(start_time TIMESTAMP, end_time TIMESTAMP) RETURNS void LANGUAGE PLPGSQL AS $function$
BEGIN RAISE NOTICE 'Computing 5min rollups from % to % (excluded)', start_time, end_time; RAISE NOTICE 'Aggregating data into 5 min rollup table';
EXECUTE $$
INSERT INTO rollup_events_5min
SELECT customer_id, event_type, country, browser, date_trunc('seconds', (timestamp - TIMESTAMP 'epoch') / 300) * 300 + TIMESTAMP 'epoch' AS minute, count(*) as event_count, hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count, hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count
FROM events WHERE timestamp >= $1 AND timestamp<=$2
GROUP BY
customer_id,
event_type,
country,
browser,
minute
ON CONFLICT (customer_id,event_type,country,browser,minute)
DO UPDATE
SET event_count=excluded.event_count, device_distinct_count = excluded.device_distinct_count, session_distinct_count= excluded.session_distinct_count;$$
USING start_time, end_time; RAISE NOTICE 'Aggregating/Upserting into 1 hr rollup table';
EXECUTE $$
INSERT INTO rollup_events_1hr
SELECT customer_id, event_type, country, browser, date_trunc('hour', timestamp) as hour, count(*) as event_count, hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count, hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count
FROM events WHERE timestamp >= $1 AND timestamp<=$2
GROUP BY
customer_id,
event_type,
country,
browser,
hour
ON CONFLICT (customer_id,event_type,country,browser,hour)
DO UPDATE
SET event_count=rollup_events_1hr.event_count+excluded.event_count, device_distinct_count = rollup_events_1hr.device_distinct_count || excluded.device_distinct_count, session_distinct_count= rollup_events_1hr.session_distinct_count || excluded.session_distinct_count;$$
USING start_time, end_time;
END;
$function$;

For rolling up the last five minutes of data, we can now trigger our function:

SELECT compute_rollups_every_5min((now()-interval '5 minutes')::timestamp, now()::timestamp);

Automating Your Roll-Ups

Scheduling your own background job to run and perform the roll-ups is an option. When using Citus, you can just as easily schedule a job directly in your database since the database is doing all the heavy lifting. With pg_cron, you can schedule your rollup and call the function compute_rollups_every_5min to run every five minutes on top of the raw data.

Querying the Real-Time Events Dashboard

With our system in place now ingesting data and performing roll-ups, we can get to the fun part of querying — because all of our roll-up tables are significantly smaller in size than the raw data you’ll see performant queries against them. And all the while, Citus is now continually rolling up the raw data and storing it in an efficient data type (HyperLogLog) to let you compose all sorts of reports based on the data.

Let’s look at a few examples of queries you may want to run:

--Get me the total number of events and count of distinct devices in the last 5 minutes? SELECT sum(event_count), hll_cardinality(sum(device_distinct_count)) FROM rollup_events_5min where minute >=now()-interval '5 minutes' AND minute <=now() AND customer_id=1; --Get me the count of distinct sessions over the last week? SELECT sum(event_count), hll_cardinality(sum(device_distinct_count)) FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '7 days' AND hour <=now() AND customer_id=1; -- Get me the trend of my app usage in the last 2 days broken by hour SELECT hour, sum(event_count) event_count, hll_cardinality(sum(device_distinct_count)) device_count, hll_cardinality(sum(session_distinct_count)) session_count FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '2 days' AND hour <=now() AND customer_id=1 GROUP BY hour;

Data is everywhere in this day and age, and key to being successful is being able to derive insights from your data. By making data available in real time, you’re able to create more value for your users. With Citus, you can both have real-time ingestion as well as real-time reporting with high concurrency without having to throw out SQL, which is the lingua franca when it comes to data. 

Original Link

The Effect of Cardinality on Data Ingest (Part 1)

In my role as a Sales Engineer here at InfluxData, I get to talk to a lot of clients about how they’re using InfluxDB and the rest of the TICK Stack. We have a large number of very big clients using InfluxEnterprise for metrics collection, analysis, visualization, and alerting in their DevOps area, so we’ve done a lot of scale-out testing for these clients. In these tests, we see very linear scale-out as we add additional nodes to an InfluxEnterprise Cluster. I’ll talk about this in my next blog post.

Over the last six months, I’ve seen more and more large manufacturers, energy companies, and utilities coming to us for collecting metrics from their IoT devices. Many times, they’re working with consulting companies that specialize in building IoT solutions, and these companies bring InfluxDB into the solution because we’re so well-suited for time series applications.

A few things I’ve noticed with these IoT applications is that many times, there is a need for a local instance of InfluxDB running in the factory and alerting locally on anything they’re monitoring. In these cases, the equipment they have to run on is pretty lightweight, so it’s just as important to understand how we scale down as it is to understand how we scale up. The other thing is that the cardinality of the data can be rather large compared to the amount of data to be ingested. So, I thought I’d do some scale-down testing, as well as measure the impact of cardinality on write throughput.

That’s what this blog post is about. It’s the first of a series I’m doing on performance testing of InfluxEnterprise. So if you’re interested in this topic, stay tuned.

The Setup

For the purposes of this test, I’ll be spinning up a cluster in AWS using some utilities we’ve built to make this easy. If you haven’t worked with InfluxData’s TICK Stack before, you’ll be surprised how easy it is to install and set up. In fact, one of my peers, David Simmons, wrote another post on that topic. Check it out!

For running InfluxDB on AWS, we’ve found that the R4 type of instances that are optimized for memory-intensive applications work best. These also use SSD storage, which is recommended for your data, wal, and hh directories when running InfluxDB or InfluxEnterprise.

For the testing, I’ll be spinning up the following size clusters on AWS:

  1. 2 nodes with 2 cores and 15.25 GB of memory (r4.large)
  2. 2 nodes with 4 cores and 30.5 GB of memory (r4.xlarge)
  3. 2 nodes with 8 cores and 61 GB of memory (r4.2xlarge)
  4. 2 nodes with 16 cores and 122 GB of memory (r4.4xlarge)

And I’ll test these using data cardinality series of 10,000, 100,000, and 1,000,000 to see how the number of unique series affects the ingest rate and the heap used.

For Part 2 of this series, I’ll also scale out to 4, 6, 8, and 10 node clusters and increase cardinality to show how well InfluxEnterprise scales horizontally.

To generate data for testing with the correct granularity, I’ll be using a utility developed by one of our engineers called inch, which stands for INflux benCHmarking. This is an awesome utility for simulating streaming data for benchmarking purposes. It’s written in Go and is available out on GitHub. If you type inch -h, you’ll get help on using the utility. I’ve listed the options below:

Usage of inch: -b int
Batch size (default 5000) -c int
Concurrency (default 1) -consistency string
Write consistency (default any) (default "any") -db string
Database to write to (default "stress") -delay duration
Delay between writes -dry
Dry run (maximum writer perf of inch on box) -f int
Fields per point (default 1) -host string
Host (default "http://localhost:8086") -m int
Measurements (default 1) -max-errors int
Terminate process if this many errors encountered -p int
Points per series (default 100) -password string
Host Password -report-host string
Host to send metrics -report-password string
Password Host to send metrics -report-tags string
Comma separated k=v tags to report alongside metrics -report-user string
User for Host to send metrics -shard-duration string
Set shard duration (default 7d) (default "7d") -t string
Tag cardinality (default "10,10,10") -target-latency duration
If set inch will attempt to adapt write delay to meet target -time duration
Time span to spread writes over -user string
Host User -v
Verbose

Using inch, I’ll generate data from two client nodes running on AWS m4.2xlarge nodes that have 8 cores each and 32 GB of memory. I’ll be running 8 streams on each client for a total of 16 concurrent writers.

The difference in performance was minimal scaling up to 32 writers, so I decided not to include the numbers.

In summary, I’ll use the following constants for my testing:

  • 2 m4.2xlarge nodes running 8 write streams each
  • Batch size for writes = 10,000
  • Consistency = ANY
  • Replication factor = 2
  • Number of points to write per series = 100,000

The Testing

For this test, I’m only using 2 node clusters that provide high availability, but since we’re replicating writes across both nodes in the cluster, I’m not testing scale-out horizontally. In fact, due to cluster overhead, this performance would be slightly less than you’d expect on a single node of InfluxDB. Since most of our customers want high availability and InfluxDB provides a very high ingest rate even on smaller servers, this is a common configuration we see.

After spinning up the cluster on AWS, the first thing I did was create my database with a replication factor of 2. I called my database stress and used the CLI to create it:

influx -execute 'create database stress with replication 2'

Next, I logged into my client nodes and entered the following inch commands to start generating my workload for the 10,000 unique series test:

inch -v -c 8 -b 10000 -t 1,5000,1 -p 100000 -consistency any
inch -v -c 8 -b 10000 -t 1,1,5000 -p 100000 -consistency any

Now, let me explain the command line options for the above inch commands. -v tells inch to print out detailed stats as it’s running so I can see how many points have been written, the ingest ratem and other details about the test. -c tells inch how many write streams to run concurrently. I’m running 8 each, so 16 concurrent write streams total. -b allows me to set the batch size. A batch size 5,000 to 10,000 is recommended for InfluxDB, so I chose 10,000. -t  allows me to define the shape of my data; in other words, the number of tags and how many unique values to generate for each tag. Client one generated 3 tags, the second one having 5,000 unique values, and client two generated 3 tags, with the third one having 5,000 unique values for a combined 10,000 unique values overall. -p indicates how many points to generate per series and the -consistency option allows me to set my write consistency, which I set to any.

Here is a sample of what the generated data looks like:

The Details

So here are the results of my testing. As you can see, vertical scaling as I tested on systems with more cores was very linear. Also, as the cardinality increased, it definitely had an impact on the ingestion rate, and I found that there is a performance hit as new series are created for the first time. But then, once all the series are created, ingestion performance levels off to the rates you can see in the chart below.

I’ve also included the detailed numbers below:

Observations

I was pleasantly surprised ny how much data a cluster with 2 core nodes could handle since many IoT use cases have minimal size servers at the edge of the network where there’s sometimes a need to have some local storage, visualization, and alerting.

I also was pleased to see how linearly the vertical scaling was as cores were added and as the cardinality of the data was increased. Also, the amount of memory needed as the cardinality was increased 10x from 100,000 to 1,000,000 also increased about 10x, which again was very predictable, which is good when doing capacity planning on your InfluxEnterprise environment.

What’s Next?

Stay tuned for Part 2, where I’ll test horizontal cluster scale out.

If you’d also like to see some comparison benchmarks of InfluxDB vs. OpenTSDB, ElasticSearch, Cassandra, or Mongo, check out these other benchmarks that have been run.

Original Link

Using WebSockets With Apache NiFi

I wanted to test out the new WebSocket listener in Apache NiFi 1.x, but I needed a server to serve up my HTML client. So, I ran that web server with NiFi, as well. Now, my full solution is hosted and runs on Apache NiFi.

This simple WebSockets server and client is like the “Hello World” of WebSockets. It’s like an echo — whatever the client sends, we send it back!

My suggested use cases for WebSockets are:

  • WebSocket client to Slack interface.
  • WebSocket client to email.
  • WebSocket chat stored to Apache Phoenix.
  • WebSocket to communicate with mobile web apps.
  • WebSocket to send detailed log details from enterprise web applications directly to the log ingestion platform, bypassing the local filesystem.

Step 1HandleHTTPRequest accepts the HTTP calls from browsers.

Step 2ExecuteStreamCommand returns the HTML page (could do getfile or any number of other ways of getting the HTML as a flowfile).

Step 3HandleHttpResponse this serves up our web page to browsers. StandardHTTPContextMap is required to store HTTP requests and responses to share them through the stream.

Step 4PutFile keeps logs of what’s going on, I saw all the flow files to the local file system.

Step 5ListenWebSocket is the actual WebSocket server listener; it’s what our client will talk to.

Step 6PutWebSocket is the reply to the WebSocket client.

WebSockets server:

WebSockets client (static HTML5 page with Javascript) hosted on NiFi:

WebSocket conversation on the client side:

Here’s a script to output the HTML5 JavaScript WebSocket client:

cat server.sh cat /wsclient.html <!DOCTYPE HTML> <html> <head> <script type="text/javascript"> function WebSocketTest() { if ("WebSocket" in window) { alert("WebSocket is supported by your Browser!"); // Let us open a web socket var ws = new WebSocket("ws://localhost:9998/echo"); ws.send("MSG: NIFI IS AWESOME"); ws.onopen = function() { // Web Socket is connected, send data using send() ws.send("Message to send"); alert("Message is sent..."); }; ws.onmessage = function (evt) { var received_msg = evt.data; alert("Message is received..."); }; ws.onclose = function() { // websocket is closed. alert("Connection is closed..."); }; } else { // The browser doesn't support WebSocket alert("WebSocket NOT supported by your Browser!"); } } </script> </head> <body> <div id="sse"> <a href="javascript:WebSocketTest()">Run WebSocket</a> </div> </body> </html> 

Resources

Original Link

MariaDB Energizes the Data Warehouse With Open-Source Analytics Solution

MariaDB® Corporation has introduced new product enhancements to MariaDB AX to deliver a modern approach to data warehousing that enables customers to perform fast and scalable analytics more cost-effectively. MariaDB AX expands the MariaDB Server, creating a solution that enables high-performance analytics with distributed storage and parallel processing and scales with existing commodity hardware on premises or across any cloud platform.

“MariaDB AX is a powerful, open-source solution for performing custom and complex analytics,” says David Thompson, VP of Engineering at MariaDB Corporation. “In order to fully realize the power of big data, our customers need the ability to gather insights in near-real-time, regardless of where the data is coming from. With MariaDB AX, it’s easier than ever to ingest and analyze streaming data from disparate sources, while ensuring the highest level of reliability through new high availability and backup capabilities.”

Data warehouses are traditionally expensive and complex to operate. Driven by the need for more meaningful and timely analytics that meet hardware and cost pressures, companies are reassessing their data warehouse and analytics strategy. Built for performance and scalability, MariaDB AX uses a distributed and columnar open-source storage engine with parallel query processing that allows customers to store more data and analyze it faster. MariaDB AX supports a wide range of advanced analytic use cases across every industry; for example, identifying health trends to inform healthcare programs and policy, behavioral analysis to inform customer service and sales strategies, and analysis of financial anomalies to inform fraud programs.

MariaDB AX includes new features that enable easy, high-performance data analytics, such as:

  • Simpler and streamlined data ingestion: Make data available for analysis quickly and easily. MariaDB AX, with the introduction of data adapters and direct access to the underlying storage engine, removes the need for complex, time-consuming batch processes to import data. By reducing the time it takes to make data available for analysis, organizations can realize a faster time-to-insight and the competitive advantages it brings. New data adapters include:

    • Bulk data adapters: Develop applications and services to continuously collect and write large amounts of data to MariaDB AX for analysis, or publish machine learning results to MariaDB AX for data scientists and others to view or further analyze via SQL.

    • Streaming data adapter for MariaDB MaxScale: Analyze operational data in near real time, within seconds of being created. When MariaDB TX is deployed for operational workloads, the data adapter uses MariaDB MaxScale and change-data-capture streams to replicate data from the operational environment to MariaDB AX automatically and continuously.

    • Streaming data adapter for Apache Kafka: Combine and analyze data from multiple sources by aggregating data from web, mobile, and Internet of Things applications. The data adapter, currently in beta, consumes messages published to Apache Kafka topics and writes the data to MariaDB AX, enabling data scientists to analyze related data from multiple sources.

  • More powerful, custom analytics: Extend the analytical capabilities of MariaDB AX by creating custom analytical functions for relational and non-relational data such as JSON. The introduction of an API for creating user-defined aggregate and window functions, combined with support for text and binary data, enables data scientists to analyze structured, semi-structured and unstructured data using custom analytical functions.

  • Improved high availability and disaster recovery: Ensure data is always available for analysis and simplify the process of backing up and restoring data. MariaDB AX adds support for GlusterFS to provide data high availability without the need for an expensive SAN, while a new tool automatically uses multiple, concurrent connections to backup and restore distributed data on multiple servers from a single location.

“Like so many organizations today, for us, data is the lifeblood of our operations,” says Patrice Linel, Head of Scientific Computing at Genus plc, the parent company of ABS Global and PIC, helping farmers produce better quality meat and milk more efficiently and sustainably. “With the new data ingestion capabilities in MariaDB AX, we are freed from the manual, time-consuming practice of moving massive amounts of genetic data from its source format for analysis. Now with MariaDB AX, we can queue and instantly move data directly from Python into MariaDB where real-time analytics happens. The effort saved is substantial and the data is always the most current.”

MariaDB AX is available for download now.

Original Link

The Secret to Getting Data Lake Insight: Data Quality

More and more companies around the globe are realizing that big data and deeper analytics can help improve their revenue and profitability. As such, they are building data lakes using new big data technologies and tools, so they can answer questions such as, How do we increase production while maintaining costs? How do we improve customer intimacy and share of wallet? What new business opportunities should we pursue? Big data is playing a major role in digital transformation projects; however, companies that do not have trusted data at the heart of their operations will not realize the full benefits of their efforts.

Instituting Sustainable Data Quality and Governance Measures

If big data is to be used, organizations need to make sure that this information collection is under control and sticks to a high standard. Yet, according to a recent report by KPMG, 56% of CEOs are concerned about the quality of the data they’re using to base decisions. To improve the trustworthiness of data as it flows through the enterprise, companies need to look at the entire data quality lifecycle including metadata management, lineage, preparation, cleansing, profiling, stewardship, privacy, and security.

A few weeks ago, Gartner released the 2017 Gartner Magic Quadrant for Data Quality Tools — a report that reviews the data quality lifecycle and showcases innovative technologies designed to “meet the needs of end-user organizations in the next 12 to 18 months.”

The report highlights the increasing importance of data quality for the success of digital transformation projects, the need to use data quality as a means to reduce costs, and the changing requirements to be a leader. Some of the trends highlighted in the report that speak directly to data lake development and usage include:

  • The need to capture and reconcile metadata.
  • The ability to connect to a wide variety of on-premises and cloud structured and unstructured data sources.
  • The importance of DevOps and integration interoperability in the data quality environment.
  • How business users are now the primary audience and need data quality workflow and issue resolution tools.
  • The increasing requirement for real-time data quality services for low-latency applications.

Machine Learning and Natural Language Processing to the Rescue

As companies ingest large amounts of unstructured and unknown data, it can be a challenge to validate, cleanse, and transform the data in sufficient time without delaying real-time decisions and analytics. This does not mean that 100% of the data lake needs to be sanctioned data, as companies will create a data lake partition of “raw data” which data scientists often prefer for analysis. In addition, raw and diverse data can be provisioned among different roles before enrichment, shifting from a single version of the truth model to a more open and trustworthy collaborative governance model.

In the past, data quality would rely solely on a complex algorithm; for example, probabilistic matching for deduplicating and reconciling records. An important trend we are seeing at Talend and outlined in the Gartner report is the use of machine learning with data quality to assist with matching, linking, and merging data. With the sheer volume and variety of data in the data lake, using Hadoop, Spark, and machine learning for data quality processing means faster time to trusted insight. Data science algorithms can quickly sift through gigabytes of data to identify relationships between data, duplicates, and more. Natural language processing can help reconcile definitions and provide structure to unstructured text, providing additional insight when combined with structured data.

Machine learning can be a game changer because it can capture tacit knowledge from the people that know the data best, then turn this knowledge into algorithms, which can be used to automate data processing at scale. Furthermore, through smarter software that uses machine learning and smart semantics, any line of business user can become a data curator – making data quality a team sport! For example, tools such as Talend Preparation and Data Stewardship combine a highly interactive, visual and guided user experience with these features to make the data curation process easier and the data cleansing process faster.

Devising a Plan for Agile Data Quality in the Data Lake

Implementing a data quality program for big data can be overwhelming. It is important to come up with an incremental plan and set realistic goals; sometimes getting to 95% is good enough.

  1. Roles: Identify roles, including data stewards and users of data.

  2. Discovery: Understand where data is coming from, where it is going, and what shape it is in. Focus on cleaning your most valuable and most used data first.

  3. Standardization: Validate, cleanse, and transform data. Add metadata early so that data can be found by humans and machines. Identify and protect personal and private organizational data with data masking.

  4. Reconciliation: Verify that data was migrated correctly.

  5. Self-service: Make data quality agile by letting the people who know the data best clean their own data.

  6. Automate: Identify where machine learning in the data quality process can help, such as data deduplication.

  7. Monitor and manage: Get continuous feedback from users, come up with data quality measurement metrics to improve.

In summary, for companies to get the most out of their digital transformation projects and build an agile data lake, they need to design data quality processes from the start.

*Gartner does not endorse any vendor, product or service depicted in its research publications, and does not advise technology users to select only those vendors with the highest ratings or other designation. Gartner research publications consist of the opinions of Gartner’s research organization and should not be construed as statements of fact. Gartner disclaims all warranties, expressed or implied, with respect to this research, including any warranties of merchantability or fitness for a particular purpose.

Original Link

Using Kylo for Self-Service Data Ingestion, Cleansing, and Validation

Kylo is a feature-rich data lake platform built on Apache Hadoop and Apache Spark. Kylo provides a business-friendly data lake solution and enables self-service data ingestion, data wrangling, data profiling, data validation, data cleansing/standardization, and data discovery. Its intuitive user interface allows IT professionals to access the data lake (without having to code).

Though there are many tools to ingest batch data and/or streaming or real-time data, Kylo supports both. It provides a plug-in architecture with a variety of extensions. Apache NiFi templates provide incredible flexibility for batch and streaming use cases.

In this post, let’s discuss ingesting data from Apache Kafka, performing data cleansing and validation at real-time, and persisting the data into Apache Hive table.

Prerequisites

  • Install Kafka
  • Deploy Kylo, where the deployment requires knowledge on different components/technologies such as:
    • AngularJS for Kylo UI
    • Apache Spark for data wrangling, data profiling, data validation, data cleansing, and schema detection
    • JBoss ModeShape and MySQL for Kylo Metadata Server
    • Apache NiFi for pipeline orchestration
    • Apache ActiveMQ for interprocess communication
    • Elasticsearch for search-based data discovery
    • All Hadoop technologies but most preferably HDFS, YARN, and Hive

To learn more about basics and installation of Kylo in an AWS EC2 instance, refer to our previous blog on Kylo setup for data lake management.

Data Description

A user transaction dataset with 68K rows, generated by Treselle team, is used as the source file. The input dataset has time, UUID, user, business, address, amount, and disputed columns.

Sample dataset:select

Examples of invalid and missing values in the dataset:select

Use Case

  • Publish user transaction dataset into Kafka.
  • Ingest data from Kafka using Kylo data ingestion template and standardize and validate data.

Synopsis

  • Customize data ingest pipeline template.
  • Define categories for feeds.
  • Define feeds with source and destination.
  • Cleanse and validate data.
  • Schedule feeds.
  • Monitor feeds.

Self-Service Data Ingest, Data Cleansing, and Data Validation

Kylo utilizes Spark to provide a pre-defined pipeline template, which implements multiple best practices around data ingestion. By default, it comes up with file system and databases. It helps business users in simplifying the configuration of ingest data from new sources such as JMS, Kafka, HDFS, HBase, FTP, SFTP, REST, HTTP, TCP, IMAP, AMQP, POP3, MQTT, WebSocket, Flume, Elasticsearch and Solr, Microsoft Azure Event Hub, Microsoft Exchange using Exchange Web Services (EWS), Couchbase, MongoDB, Amazon S3, SQS, DynamoDB, and Splunk.

Apache NiFi, a scheduler and orchestration engine, provides an integrated framework for designing new types of pipelines with 250+ processors (data connectors and transforms). The pre-defined data ingest template is modified by adding Kafka, S3, HDFS, and FTP as shown in the below screenshot:select

Get, Consume, and Fetch processors are used to ingest the data. The Get and Consume versions of Kafka processors in NiFi is as follows:

  • GetKafka 1.3.0: Fetches messages from the earlier version of Apache Kafka (specifically 0.8.x versions). The complementary NiFi processor used to send messages is PutKafka.

  • ConsumeKafka_0_10 1.3.0: Consumes messages from the newer version of Apache Kafka specifically built against the Kafka 0.10.x Consumer API.

Based on need, a custom processor or other custom extension for NiFi can be written and packaged as a NAR file and deployed into NiFi.

Customizing Data Ingest Pipeline Template

Upon updating and saving the data ingest template in NiFi, the same template can be customized in Kylo UI. The customization steps involve:

  • Customizing feed destination table
  • Adding input properties
  • Adding additional properties
  • Performing access control
  • Registering the template

selectselect

Defining Categories for Feeds

All the feeds created in Kylo should be categorized. The process group in NiFi is launched to execute the feeds. The “Transaction raw data” category is created to categorize the feeds.select

Defining Feeds With Source and Destination

Kylo UI is self-explanatory to create and schedule feeds. To define feeds, perform the following:

  • Choose the data ingest template.
  • Provide feed name, category, and description.

select

  • Choose input Data Source to ingest data.
  • Customize the configuration parameter related to that source; for example, transactionRawTopic in Kafka and batch size 10000.

select

  • Define output feed table using either of the following methods:
    • Manually define the table columns and its data type.
    • Upload the sample file and update the data type as per the data in the column.
  • Preview the data under Feed Details section in the top right corner:

select

  • Define partitioning output table by choosing Source Field and Partition Formula, for example, time as source field and year as partition formula to partition the data.

select

Cleansing and Validating Data

The feed creation wizard UI allows end-users to configure cleansing and standardization functions to manipulate data into conventional or canonical formats (for example, simple data type conversion such as dates or stripping special characters) or data protection (for example, masking credit cards, PII, and so on).

It allows users to define field-level validation to protect data against quality issues and provides schema validation automatically. It provides an extensible Java API to develop custom validation, custom cleansing, and standardization routines as per needs. It provides predefined rules for standardization and validation of different data types.select

To clean and validate data, perform the following:

  • Apply different predefined standardization rules for time, user, address, and amount columns as shown below:

selectselect

  • Apply standardization and validation for different columns as shown in the below screenshot:

select

  • Define the data ingestion merge strategy in the output table.
  • Choose Dedupe and merge to ignore duplicated batch data and insert it into the desired output table.

select

  • Use the Target Format section to define data storage and compression options.
    • Supported storage formats: ORC, Parquet, Avro, TextFile, and RCFile
    • Compression options: Snappy and Zlib

select

Scheduling Feeds

To schedule the feeds using a cron or timer-based mechanism, enable the Enable feed immediately option to enable the feeds immediately without waiting for a cron job or timer criteria needs.select

Monitoring Feeds

After scheduling the feeds, the actual execution will be performed in NiFi. Feeds status can be edited and monitored. The feed details can be changed at any time and the feeds can be re-scheduled.

select

An overview of created feed job status can be seen under jobs in the Operation sections. By drilling down the jobs, you can identify the details of each job and perform debugging upon feed job execution failure.

select

The Job Activity section provides details such as completed, running, and so on of a specific feed recurring activity.

select

The Operational Job Statistics section provides details such as success rate, flow rate per second, flow duration, and steps duration of specific job statistics.

selectselect

Conclusion

In this blog, we discussed data ingestion, cleansing, and validation without any coding in the Kylo data lake platform. The ingested data output from Kafka is shown in a Hive table in Ambari looks as follows:select

In my next blog, we’ll discuss data profiling and search-based data discovery.

select

References

Original Link

Import and Ingest Data Into HDFS Using Kafka in StreamSets

StreamSets provides state-of-the-art data ingestion to easily and continuously ingest data from various origins such as relational databases, flat files, AWS, and so on, and write data to various systems such as HDFS, HBase, Solr, and so on. Its configuration-driven UI helps you design pipelines for data ingestion in minutes. Data is routed, transformed, and enriched during ingestion and made ready for consumption and delivery to downstream systems.

Kafka, an intermediate data store, helps to very easily replay ingestion, consume datasets across multiple applications, and perform data analysis.

In this blog, let’s discuss reading the data from different data sources such as Amazon Simple Storage Service (S3) and flat files, and writing the data into HDFS using Kafka in StreamSets.

Prerequisites

  • Install Java 1.8
  • Install streamsets-datacollector-2.6.0.1

Use Case

Import and ingest data from different data sources into HDFS using Kafka in StreamSets.

Data Description

Network data of outdoor field sensors is used as the source file. Additional fields, dummy data, empty data, and duplicate data were added to the source file. The dataset has total record count of 600K with 3.5K duplicate records.

Sample data:

{"ambient_temperature":"16.70","datetime":"Wed Aug 30 18:42:45 IST 2017","humidity":"76.4517","lat":36.17,"lng":-
119.7462,"photo_sensor":"1003.3","radiation_level":"201","sensor_id":"c6698873b4f14b995c9e66ad0d8f29e3","
sensor_name":"California","sensor_uuid":"probe-2a2515fc","timestamp":1504098765}

Synopsis

  • Read data from the local file system and produce data to Kafka.
  • Read data from Amazon S3 and produce data to Kafka.
  • Consume streaming data produced by Kafka.
  • Remove duplicate records.
  • Persist data into HDFS.
  • View data loading statistics.

Reading Data From Local File System and Producing Data to Kafka

To read data from the local file system, perform the following:

  • Create a new pipeline.
  • Configure the File Directory origin to read files from a directory.
  • Set Data Format as JSON and JSON content as Multiple JSON objects.
  • Use Kafka Producer processor to produce data into Kafka. (Note: If there are no Kafka processors, install the Apache Kafka package and restart SDC.)
  • Produce the data under topic sensor_data.

reading-data-from-local-file-systemreading-data-from-local-file-system1

Reading Data From Amazon S3 and Producing Data to Kafka

To read data from Amazon S3 and produce data into Kafka, perform the following:

  • Create another pipeline.
  • Use Amazon S3 origin processor to read data from S3. (Note: If there are no Amazon S3 processors, install the Amazon Web Services 1.11.123 package available under Package Manager.)
  • Configure processor by providing Access Key ID, Secret Access Key, Region, and Bucket name.
  • Set the data format as JSON.
  • Produce data under the same Kafka topic, sensor_data.

reading-data-from-amazon-s3reading-data-from-amazon-s3-1

Consuming Streaming Data Produced by Kafka

To consume streaming data produced by Kafka, perform the following:

  • Create a new pipeline.
  • Use Kafka Consumer origin to consume Kafka produced data.
  • Configure processor by providing the following details:
    • Broker URI
    • ZooKeeper URI
    • Topic: Set the topic name as sensor_data (same data produced in previous sections)
  • Set the data format as JSON.

consuming-streaming-data-produced-by-kafka

Removing Duplicate Records

To remove duplicate records using Record Deduplicator processor, perform the following:

  • Under Deduplication tab, provide the following fields to compare and find duplicates:
    • Max. Records to Compare
    • Time to Compare
    • Compare
    • Fields to Compare (for example, find duplicates based on sensor_id and sensor_uuid)
  • Move the duplicate records to Trash.
  • Store the unique records in HDFS.

removing-duplicate-records

Persisting Data into HDFS

To load data into HDFS, perform the following:

  • Configure the Hadoop FS destination processor from stage library HDP 2.6.
  • Select data format as JSON. (Notecore-site.xml and hdfs-site.xml files are placed in Hadoop-conf directory (/var/lib/sdc-resources/hadoop-conf.) While installing StreamSets, the sdc-resources directory will be created.

persisting-data-into-hdfs

Viewing Data Loading Statistics

Data loading statistics, after removing duplicates from different sources, look as follows:viewing-data-loading-statisticsviewing-data-loading-statistics1

References

Original Link

This Month in Big Data: The Kafka Edition

This month in big data, we’re doing something a little different. In the past, we’ve looked at a variety of topics in these posts — like in August, when we talked about analytics, design, and Donald Trump. But for September, we’re going to wrap the month up with a deep dive into the wonderful world of Kafka.


Top 5 Kafka Articles on DZone

Check out the top five Kafka-related articles on DZone that have been around since we were a wee little baby called Java Lobby. These articles are best read in order, so follow along and enhance your Kafka knowledge!

  1. What Is Kafka? by Jean-Paul Azar. First things first: you need to learn what Kafka is in the first place if you don’t know already. Come glean some insight on why one-third of all Fortune 500 companies use Kafka and check out a few specific Kafka use cases.

  2. Spark Streaming vs. Kafka Streaming by Mahesh Chand Kandpal. Now that you know about Kafka, you may be comparing it to Spark. Which should you choose? Depending on your specific needs, one or the other will be right for you.

  3. Big Data Ingestion: Flume, Kafka, and NiFi by Tony Siciliani. Teamwork makes the dream work, right? So there’s no reason for Kafka to be used alone. Learn how these Apache ingesting tools fare in dealing with the high volume, variety, and velocity of data showing up at the gates of what would typically be a Hadoop ecosystem.

  4. Evaluating Message Brokers: Kafka vs. Kinesis vs. SQS by Swaroop Ramachandra. Now that we’ve brought them together, let’s pit them against each other! Kafka, Kinesis, and SQS are some of the best message brokers for big data applications. But who should reign on top?

  5. Kafka Avro Scala Example by Sushil Kumar Singh. Lastly, let’s look at a specific example of using Kafka. In this code-heavy tutorial, see how you can use Kafka and Avro to create a messaging system with complex data.

PS: Are you interested in contributing to DZone? Check out our brand new Bounty Board, where you can apply for specific writing prompts and win prizes! 


Kafka Abroad

Let’s journey outside of DZone and check out some recent news, conferences, and more that should be of interest to Kafka newbies and experts alike.

  • The Kafka Summit took place last month in San Francisco, CA and will be happening April 23-24, 2018 in London, England! This summit is the world’s largest Apache Kafka event. Companies large and small, from Netflix to Uber to Yelp, come to share the experiences they’ve had on their transformational Kafka journeys. Weren’t lucky enough to make it to 2017’s Summit? No worries! Check out the presentations here.

  • LinkedIn announces open-source tool to keep Kafka clusters running. One major announcement at August’s Kafka Summit was that LinkedIn now has a load balancing tool that recognizes when clusters are about to break. The tool, called Cruise Control, is open-source and helps keep Kafka clusters up and running.

  • 2017 Apache Kafka Report. In this free download, you’ll learn about the latest breakthroughs in Kafka technology, how Kafka is helping to fill the gap in the data infrastructure of growing companies, and the impact that Apache Kafka and streaming data have on business.


Dive Deeper Into Kafka

DZone has Guides and Refcardz on pretty much every tech-related topic, but if you’re specifically interested in Kafka, these will appeal the most to you. 

  1. The DZone Guide to Big Data: Data Science and Advanced Analytics. Explore the critical capabilities in next-generation self-service data preparation tools and dive deep into applications and languages affiliated with big data.

  2. Data Warehousing: Best Practices for Collecting, Storing, and Delivering Decision-Support Data. In this DZone Refcard, gain a complete understanding of data modeling, infrastructure, relationships, attributes, and speedy history downloading and recording with atomic data.


Jobs for the Kafka Expert

Below are a few jobs that may pique your interest if you’re a Kafka aficionado. Check them out and apply today!

Network Data Scientist
Thousand Eyes
Location: Anywhere!
Experience: Ph.D. or MS in Computer Science; experience with analyzing BGP routing streams, traffic analysis, tcpdump and libpcap, and distributed real-time systems.

Python Developer
Citrus Bits
Location: Los Angeles, California, United States
Experience: In-depth knowledge of Python; either mobile experience, AWS experience, or video domain experience; will be optimizing a mobile app that streams real-time video and audio running on a Python backend and AWS.

That’s all for September’s This Month in Big Data post! What would you like to learn about next time? Let us know in the comments!

Original Link

Graph Algorithms: Make Election Data Great Again

Editor’s Note: This presentation was given by John Swain at GraphConnect San Francisco in October 2016.

In this presentation, learn how John Swain of Right Relevance (and Microsoft Azure) set out to analyze Twitter conversations around both Brexit and the 2016 U.S. Presidential election data using graph algorithms.

To begin, Swain discusses the role of social media influencers and debunks the common Internet trope of the Law of the Few, rechristening it as “the Law of Quite a Few.”

Swain then dives into his team’s methodology, including the OODA (observe, orient, decide, and act) loop approach borrowed from the British Navy. He also details how they built the graph for the U.S. Presidential election and how they ingested the data.

Next, Swain explains how they analyzed the election graph using graph algorithms, from PageRank and betweenness centrality to Rank (a consolidation of metrics) and community detection algorithms.

Ryan Boyd then guest presents on using graph algorithms via the APOC library of user-defined functions and user-defined procedures.

Swain then puts it all together to discuss their final analysis of the U.S. Presidential election data as well as the Brexit data.

Graph Algorithms: Make Election Data Great Again

What we’re going to be talking about today is how to use graph algorithms to effectively sort through the election noise on Twitter:

John Swain: Let’s start right off by going to October 2, 2016, the date we published our first analysis of the data we collected on Twitter conversations surrounding the U.S. Presidential Election.

On that day, the big stories were Hillary Clinton’s physical collapse and her comment about the “basket of deplorables” — which included talk about her potentially resigning from the race. It was a very crowded conversation covered intensely by the media. We wanted to demonstrate that, behind all the noise and obvious stories, there were some things contained in this data that were not quite so obvious.

We analyzed the data and created a Gephi map of the 15,000 top users. One of the clusters we identified included journalists, the most prominent of whom was Washington Post reporter David Fahrenthold. Five days later, Fahrenthold broke the story about Donald Trump being recorded saying extremely lewd comments about women.

We’re going to go over how we discovered this group of influencers which, even though there was a bit of luck involved, we hope to show that it wasn’t just a fluke and is in fact repeatable.

In this presentation, we’re going to go over the problem we set out to solve and the data we needed to solve that problem, how we processed the graph data (with Neo4j and R), and how Neo4j helped us overcome some scalability issues we encountered.

I started this as a volunteer project about two years ago with the Ebola crisis, which was a part of the Statistics Without Borders project for the United Nations. We were looking for information like the below in the Twitter conversation about Ebola to identify people who were sharing useful information:

Because there was no budget, I had to use open-source software and started with R and Neo4j Community Edition.

I quickly ran into a problem. There was a single case of Ebola that hit the United States in Dallas, which happened to coincide with the midterm elections. The Twitter conversation about Ebola got hijacked by the political right and an organization called Teacourt, all of whom suggested that President Obama was responsible for this incident and that you could catch Ebola in all kinds of weird ways.

This crowded out the rest of the conversation, and we had to find a way to get to the original information that we were seeking. I did find a solution, which we realized we could apply to other situations that were confusing, strange, or new — which pretty much described the 2016 U.S. Presidential election.

Debunking the Law of the Few

So, where did we start? It started with something that everybody’s pretty familiar with — the common Internet trope about the Law of the Few, which started with Stanley Milgram’s famous experiment that showed we are all connected by six degrees of separation. This spawned things like the Kevin Bacon Index and was popularized by the Malcolm Gladwell book The Tipping Point.

Gladwell argues that any social epidemic is dependent on people with a particular and rare set of social gifts spreading information through networks. Whether you’re trying to push your message into a social network or are listening to messages coming out, the mechanism is the same.

Our plan was to collect the Twitter data, mark these relationships, and then analyze the mechanism for the spread of information so that we could separate out the noise.

To do this, we collected data from the Twitter API and built a data model in Neo4j:

The original source code — the Python scripts and important routines for pulling this into Neo4j — is also still available on Nicole White’s GitHub.

However, we encountered a problem. At the scale we wanted to conduct our analysis, we couldn’t collect all of the followers and following information that we wanted because the rate limits on the Twitter API are too limiting. So, we hit a full stop and went back to the drawing board.

Through this next set of research, we found two really good books by Duncan Watts: Everything Is Obvious and Six Degrees. He is one of the first people to do empirical research on the Law of the Few (six degrees of separation), which showed that there is actually a problem with this theory because any process that relies on targeting a few special individuals is bound to be unreliable. No matter how popular and how compelling the story, it simply doesn’t work that way.

For that reason, we rechristened it “The Law of Quite a Few” and named the people who are responsible for spreading information through social networks, which are ordinary influencers. These aren’t just anybody; they’re people with some skills, but it’s not just a very few special individuals.

Methodology

We borrowed a methodology from military intelligence in the British Navy called the OODA loop: observe, orient, decide, and act. Below is a simplified version:

The key thing we learned in the research is that people are not disciplined about following the process of collecting data. Instead, we typically perform some initial observations, orient ourselves, decide what’s going on, and take some actions — but we shortcut the feedback loop to what we think we know the situation is instead of going back to the beginning and observing incoming data.

Using a feedback loop like this is essentially hindsight bias:

Hindsight bias is the belief that if you’d looked hard enough at the information that you had, the events that subsequently happened would’ve been predictable — that with the benefit of hindsight we could see how it was going to happen.

This gets perverted to mean that if you’d looked harder at the information you’d had, it would have been predictable, when in fact you needed information, you didn’t have at the time. Events aren’t predictable, even if they seem predictable when you play the world backward.

Building the Graph

Using that methodology, we committed to building the graph with Neo4j. This involved ingesting the data into Neo4j, building a simplified graph, and processing with R and igraph.

Ingesting the Data

The first part of the process is to ingest the data into Neo4j, which gets collected from the Twitter API and comes in as JSON. We scale this up so we can use the raw API rather than the Twitter API, have our libraries in Python, push that into a message queue, and store this in a document store, MongoDB.

Whether you’re pulling this from the raw API or whether you’re pulling it from a document store, you get a JSON document. We pushed a Python list into this Cypher query and used the UNWIND command, and included a reference to an article. Now, the preferred method is to use the apoc.load.json library:

We were interested in getting a simplified version of the graph with only retweets and mentions, which we use to build the graph. We built the following simplified graph, which is just the relationship between each user with a weight for every time a retweet or mention happens.

The R call calls a queryString, which is a Cypher query that essentially says MATCH users who post tweets that mention other users, with some conditions about the time period, that they’re not the same user, etc. Below is the Cypher code:

This builds a very simple relationship list for each pair of users and the number of times in each direction they’re mentioned, which results in a graph that we need to make some sense out of.

Analyzing the Graph: Graph Algorithms

The key point at this stage is that we have no external training data to do things like sentiment analysis because we have a cold start problem. Often, we’re looking at a brand new situation that we don’t have any information about.

The other issue is that social phenomena are inherently unknowable. No one could have predicted that this story was going to break or that a certain person was going to be an Internet sensation at a certain time. This requires the use of unsupervised learning algorithms to make sense of the graph that we’ve created.

Pagerank

The first algorithm we used is the well-known PageRank, a graph algorithm originally used by Google to rank the importance of web pages and is a type of eigenvector centrality algorithm by Larry Page. This ranks web pages or any other node in a graph according to how important it is in relation to all the pages that link to it recursively.

Below is an example of what we can do with PageRank. This is the same graph we started with at the beginning with top PageRank-ed users:

Here, the three users — Hillary Clinton, Joe Biden, and Donald Trump — heavily skewed the PageRank. There were a couple of other interesting users that we can see from this graph, including Jerry Springer, who had an enormous number of retweets. That’s a big number of retweets, which illustrate this temptation to pay special attention to what certain people say.

Looking backward, it’s very easy to put together a plausible reason why Jerry Springer was so successful. He had some special insight because of the people he has on his show. But the reality is, it was just luck. It could have been one of the 10,000 A-list, B-list, or C-list celebrities these days. It’s tempting to look back and rationalize what happened and believe that you could have predicted it — but that’s a myth.

Betweenness Centrality

The next graph algorithm we use is betweenness centrality, which for each user measures the number of shortest paths from all the other users that pass through that user. This tends to identify brokers of information in the network because information is passing through those nodes like an airport hub.

We also calculate some other basic stats from the graph, some of which are collected in degrees, i.e. the overall number of times a user is mentioned or retweeted; retweets, replies, and mention count; plus some information returned from the API.

And what we create is a set of derivatives which answer some natural questions. An example of that is a metric that we call Talked About:

The natural question is: Who is talked about? This is from the night of the first debate and measures the ratio of the number of times someone’s retweeted to the number of times they’re mentioned, corrected for the number of followers and a couple of other things as well.

Katy Perry is always mentioned more than anyone else simply because she has 80 million followers, so we adjust for that to measure the level of importance from outside the user’s participation in a conversation. For example, there can be an important person who isn’t very active on Twitter or involved in the conversation but who is mentioned a lot.

On this night, the most talked-about person was Lester Holt. He was obviously busy that night moderating the presidential debate and wasn’t tweeting a lot, but people were talking about him.

Rank: Consolidated Metrics

We consolidate all of these metrics into an overall measure that we call Rank:

Rank includes PageRank, betweenness centrality, and a measure we call Interestingness, which is the difference between what someone’s PageRank is and what would you expect that PageRank to be given a regression on various factors like the number of followers and reach. Someone who has a very successful meme that’s retweeted a lot and gets lots of mentions can be influential in networks, but we try to correct for that as just being noise instead of actually valuable information.

This image above is the same graph as before, and it’s natural that Donald Trump and Hillary Clinton are continually the top influencers in their network on any graph of this subject. But Rank evens out those distortions and skews from some other metrics to give you a good idea of who was genuinely important.

We’re talking about influencers, which is not something you can directly measure or compare. There’s not necessarily any perfect right or wrong answer, but you get a good indication of any given time period who has been important or influential in that conversation.

Community Detection Algorithm

Community detection separates groups of people by the connections between them. In the following example, it’s easy to see the three distinct communities of people:

In reality, we’re in multiple communities at any given time. We might have a political affiliation but also follow four different sports teams. The algorithms that calculate this nonoverlapping membership of communities are very computationally intensive.

Our solution was to run a couple of algorithms on multiple subgraphs. We take subgraphs based on in-degree types of giant components, which is the most centrally connected part of the graph, run those several times, and bring together the results to create a multiple membership.

When you visualize this, it looks something like the below. This is back to the UK Brexit conversation, with about two million tweets in this particular example:

We have two types of graphs above: one based on retweets and one based on mentions. The “retweets” graph always creates this clear separation of communities. No matter what people say on their Twitter profiles, retweets do mean endorsements on aggregate; people congregate very clearly in groups where they share common beliefs.

Mentions, including retweets, give you a very different structure that is not quite so clear. You can see that there are two communities, but there’s a lot more interaction between them.

The same is true with the community detection algorithms. The two we most frequently use are Walktrap and Infomap. Walktrap tends to create fewer, larger communities. When you combine that with retweets, you get a very clear separation.

Conversely, the Infomap algorithm creates a number of much smaller communities. In this case, it wasn’t a political affiliation, it was a vote to either leave the EU or to remain — a very clear separation. At the same time, people’s existing political affiliations overlap with that vote. It’s not usually this easy to see on the 2D visualization with color, but you get some idea of what’s going on.

At this point, we get some sense of what’s going on in the conversation. If we go back to the first US presidential debate, below is the community that we detected for Joe Biden:

We call these kinds of communities — which are people active in that conversation in a certain period of time — flocks. These results are from totally unsupervised learning. And you can that by and large, it pretty accurately relates a coherent, sensible community of people sharing certain political affiliations.

We were happy going along doing this kind of analysis and getting bigger and bigger graphs. And then the Brexit campaign created this huge volume of tweets, and we a hit brick wall in scalability. We realized that we didn’t have the capacity to handle 20 million tweets each week, and we needed to scale the graph algorithms.

We looked at various options, including GraphX on Apache Spark. But after talking to Ryan and Michael, we found that we could do this natively in Neo4j using APOC. We’re currently processing about 20 million tweets, but our target is to reach a capacity to do a billion-node capacity. And Ryan Boyd with Neo4j is going to talk more about that below.

Neo4j User-Defined APOC Procedures

Ryan Boyd: Let’s start with an overview of user-defined procedures, which are the ability to write code that executes on the Neo4j server alongside your data:

To increase the performance of any sort of analytics process, you can either bring the processing to the data, or the data to the processing. In this case, we’re moving to processing to the data. You have your Java Stored Procedure that runs in the database, Neo4j can call that through Cypher, and your applications can also issue Cypher requests.

At the bottom of the image is an example call, and as a procedure, the YIELD results. First, you use the APOC feature to create a UUID, a timestamp of the current time, and to CREATE a node and include that UUID and the timestamp that was yielded from those first two procedures.

You can do this all in Cypher, but now Neo4j 3.1 has user-defined functions that allow you to call these as functions rather than procedures:

If you look at the bottom right where you CREATE your document node, you can set the ID property to the apoc.create.uuid and then set the CREATE property to be apoc.date.format and your timestamp. This makes it easier to call directly.

We’ve taken a lot of the procedures in the APOC library and converted them to functions wherever it made sense, and the next version of APOC is out there for testing the 3.1 version.

APOC is an open-source library populated with contributions from the community, including those from Neo4j. It has tons of different functionality: to call JDBC databases, to integrate with Cassandra or Elasticsearch, ways to call HDP APIs and integrate pulling data in from web APIs like Twitter, etc.

But it also has things like graph algorithms. John’s going to talk a bit more about their work with graph algorithms that they have written and contributed as a company to the open-source APOC library that is now accessible to everyone.

Swain: We’ve started creating the graph algorithms that we are going to need to migrate everything from running the algorithms in igraph in R, to running it natively in Neo4j.

We started with PageRank and betweenness centrality, and we are working on two community detection algorithms: Walktrap and Infomap. Everything is available on GitHub, and we hope that people will contribute and join us. It’s just the tip of the iceberg, and we have a long way to go until we can complete the process and run this end-to-end.

Below is the result from three different time periods of our Brexit data:

The igraph implementation of PageRank is pretty efficient, so we’re only getting a relatively minor performance improvement. But with betweenness centrality we have a much larger performance improvement.

Because we can run this natively in Neo4j, we don’t have to build that graph projection and move it into igraph, which is a big win. When we do this with R, on fairly small graphs we get a huge improvement, but at a certain point, we just run out of memory.

Putting It All Together

Let’s turn back to where we started and how we discovered what we discovered. We had to pull together important people in the conversation (flocks), topics of conversation, and topical influence (tribes):

We’ve already gone over special people versus ordinary influencers. With the Right Relevance system, we have approximately 2.5 million users on 50,000 different topics, and we give everyone a score of their topical influence in those different topics.

Let’s turn back to journalist David Fahrenthold, who has significant influence in lots of related topics — some of which were in that conversation that we looked at right at the beginning.

What we’re trying to do is find the intersection of three things: The conversation, the trending topics — the topics that are being discussed in that conversation — and the tribes. The topics are defined by an initial search but it can be quite difficult defining the track phrases they’re called for pulling data from a Twitter API.

This means you can get multiple conversations and still not really know what the topics are going to be. This kind of influence is what we call tribes. People who are in the same tribe tend to have the same intrinsic values, demographic and psychographic qualities.

People who support a football team are the perfect example of a tribe because it changes only very slowly, if at all. If I support Manchester United, I might not be doing anything about that quality today. But if I’m going to a game, look at a particular piece of news about players being signed, or whatever, then I’m engaged in a conversation. People who are involved in that conversation are organized in flocks.

Below is Twitter information that was pulled on September 11, 2016:

This image above includes trending terms, hashtags, topics, and users. The people in this conversation had expertise or influence on these topics. That’s just a filter which selects the people in that flock, so it is now the intersection between people with certain topical influence and people in a certain flock, which includes active reporters and journalists.

You have to be really careful with reviewing and going back to the observation phase. Below is a later analysis, which shows something happening slowly but detectably, and we expected after the next debate that this process would accelerate.

Basically, establishment commentators and media have gradually become more and more prevalent on the Hillary Clinton side of the graph, leaving the Trump side of the graph quite sparse in terms of the number of influencers:

Everyone on the Hillary side of the network was starting to listen more and more to those people, and the information was filtered and became self-reinforcing.

It’s very similar to what we detected on Brexit, only it’s the other way around:

The “remain” side was very much establishment and the status quo, so people were not so active, whereas in the US presidential election both sides were very active, which is one main difference. In the Brexit campaign in the UK, anybody who was anybody really was supporting remain. The main proponents of Brexit didn’t really believe it was going to happen, but it did. There was a complacency on the other side, and the turnout ended up being very low.

Original Link