23 Useful Elasticsearch Example Queries

To illustrate the different query types in Elasticsearch, we will be searching a collection of book documents with the following fields: title, authors, summary, release date, and number of reviews.

But first, let’s create a new index and index some documents using the bulk API:

Original Link

Reporting and Analysis With Elasticsearch

Since the popularity of NoSQL and Big Data exploded in recent years, keeping up with the latest trends in databases, search engines, and business analytics is vital for developers.

And it’s hard not to be overwhelmed by the number of solutions available on the market: Amazon CloudSearch, Elasticsearch, Swiftype, Algolia, Searchify, Solr, and others.

Original Link

Elasticsearch Documents and Mappings

In Elasticsearch parlance, a document is serialized JSON data. In a typical ELK setup, when you ship a log or metric, it is typically sent along to Logstash which groks, mutates, and otherwise handles the data, as defined by the Logstash configuration. The resulting JSON is indexed in Elasticsearch.

Elasticsearch documents live in a segment of a shard, which is also a Lucene index. As additional documents are shipped, the segments grow. Whenever a search is executed, Elasticsearch checks each segment that is stored in a shard. This means that as the segments grow in quantity, searches becoming increasingly inefficient. To combat this, Elasticsearch will periodically merge similarly sized segments into a single, larger, segment and delete the original, smaller, segments.

Original Link

Analyze Elasticsearch Data in R

You can access Elasticsearch data with pure R script and standard SQL on any machine where R and Java can be installed. You can use the CData JDBC Driver for Elasticsearch and the RJDBC package to work with remote Elasticsearch data in R. By using the CData Driver, you are leveraging a driver written for industry-proven standards to access your data in the popular, open-source R language. This article shows how to use the driver to execute SQL queries to Elasticsearch and visualize Elasticsearch data by calling standard R functions.

Install R

You can match the driver’s performance gains from multithreading and managed code by running the multithreaded Microsoft R Open or by running open R linked with the BLAS/LAPACK libraries. This article uses Microsoft R Open 3.2.3, which is preconfigured to install packages from the Jan. 1, 2016 snapshot of the CRAN repository. This snapshot ensures reproducibility.

Original Link

Watching/Alerting on Real-Time Data in Elasticsearch Using Kibana and SentiNL

In the previous post, we set up an ELK stack and ran data analytics on application events and logs. In this post, we will discuss how you can watch real-time application events that are being persisted in the Elasticsearch index and raise alerts if the condition for the watcher is breached using SentiNL (a Kibana plugin).

A few examples of alerting for application events (see previous posts) are:

Original Link

Time-Based Indexing in Elasticsearch Using Java

Anybody who uses Elasticsearch for indexing time-based data, such as log events, is accustomed to the index-per-day pattern: use an index name derived from the timestamp of the logging event rounded to the nearest day and new indices pop into existence as soon as they are required. It’s a classic use-case.

Need for Time-Based Indexing

Most traditional use cases for search engines involve a relatively static collection of documents that grow slowly. Searches look for the most relevant documents, regardless of when they were created.

Original Link

Data Analytics on Application Events and Logs Using Elasticsearch, Logstash, and Kibana

In this post, we will learn how to use Elasticsearch, Logstash, and Kibana for running analytics on application events and logs. Firstly, I will install all these applications on my local machine.


You can read my previous posts on how to install Elasticsearch, Logstash, Kibana, and Filebeat on your local machine.

Original Link

Install Elasticsearch on Ubuntu 18.04.1

In this post, we will Install Elasticsearch on Ubuntu 18.04.1 including Logstash and Kibana. Elasticsearch lets you search and visualize your data.


We will begin by starting with a fresh installation of Ubuntu Server 18.04.1 and running all the updates.

Original Link

Elasticsearch Cluster with Vagrant and Virtualbox

A simple way to simulate a distributed storage and compute environment is with Virtualbox as the provider of VMs (‘Virtual Machines’) and Vagrant as the front-end scripting engine to configure, start, and stop those VMs. The goal for this post is to build a clustered virtual appliance offering Elasticsearch as a service that can be consumed/controlled by a host machine. The artifacts used in this article can be downloaded from Github.

1. Background

Backend capacity scaling in the face of increasing front-end demand has generally been addressed by replacing weaker servers with more powerful ones, CPU/RAM/disk wise — so-called ‘Vertical Scaling’. This is as opposed to ‘Horizontal Scaling,’ where more servers are simply added to the mix to handle the extra demand. Intuitively, the latter model is appealing as it sounds like less work! In the traditional RDBMS centric applications, there was no choice, and vertical scaling actually made sense because it is difficult to do joins across large distributed data tables. But vertical scaling has its limits and, more importantly, becomes very expensive well before hitting those limits. NoSQL databases that skimp on relations (the ‘R’ of RDBMS) to allow for simpler horizontal scaling have become the go-to datastores nowadays for applications that need to scale large as in facebook/google large.

The reader is referred to Hadoop: the Definitive Guide, where Tom White goes over these scale issues in depth. Applications running on distributed storage & CPU have to deal with their own issues like keeping a CPU busy on the data that is ‘local’ to it, making sure that cluster members are aware of one another and know who has what piece of the data, and perhaps elect a leader/master as needed for coordination, writes etc., as the implementation details vary across systems. We are not going to delve into all that here but our goals for this post are more pragmatic:

  1. Develop a means to run a virtual cluster of a few nodes (‘guests’) where the guests for now are carved out of my laptop by Virtualbox. Later we will extend the same means to run services on a cluster of nodes provided by AWS
  2. Install a distributed data store on this cluster of guests. Elasticsearch right now, so we can go through the mechanics
  3. Confirm that this ‘virtual Elasticsearch appliance’ offers a completely controllable service from the host.

2. VirtualBox

We use Oracle’s Virtualbox as the provider of guest virtual hosts. Virtualbox is free to use, runs very well on my Linux laptop (Ubuntu 15.04 64bit on my laptop with 8 core i7, 2.2GHz CPU, 16GB RAM), and has extensive documentation on how to control the various aspects of the hosts to be created. There are prebuilt images as well of any number of open source Linux distributions that you can simply drop in for the guest OS. It offers a variety of networking options (sometimes daunting as I found out) to expand/limit the accessibility/capability of the guests. For our purposes, we prefer a ‘host-only’, ‘private’ network with the following criteria.

  • The guests and hosts should be able to talk to each other. We want the guests to form a cluster and work together to enable a service. The host should be able to control & consume the services offered by the cluster of guests.
  • The guests should be able to access the internet. This is so they can download any OS updates, software packages they need in order to run whatever application.
  • The guests cannot be accessed from outside. This is just a made up requirement at this time as I do not want to expose the service to the outside. The host is the consumer of the service and it may roll that into its own service that it can offer to the outside if it so desires.
  • Lastly, for ease of use and portability, each guest should have an IP address & name ‘assigned’ at the time of its creation.

Installing Virtualbox and creating VMs of various kinds is quite straightforward. Based on a prebuilt image that I downloaded, I could set up a single VM fine the way I wanted. Used NAT for adapter1, host-only for adapter2, and activating the host-only interface on the VM. I wanted to clone it and build other guests, but I had troubles getting the networking right in a reliable/repeatable fashion. Networking was never my strong suit and after playing some with their networking options both via the GUI & command-line, I gave up trying to master it. I am sure the networking gurus out there can do it, so it is certainly not a limitation of Virtualbox but a limitation on my part.

But more reasonably though, I did not want to be logging into the guest to set up stuff or worse — changing settings for each guest via the GUI that VirtualBox offers. That will definitely not scale, a pain to reproduce, and is error-prone. I wanted a turn-key solution of sorts wherein I could script out all aspects of the VM cluster creation upfront, and simply run it to have that cluster created with all the tools installed, started and rearing to go.

Vagrant allows one to do that easily, as I happily found out. Basically, they have already figured out the exact sequence of ‘vboxmanage’ commands (and their options!) to run to set up a cluster specified by some high-level requirements…which was what I was trying to do and they have already done it! Plus, as the cluster set up with Vagrant is file-based, we can version it, and share it (small compared to an OVA file) to have the cluster reproduced exactly elsewhere. Maybe I am biased because of the issues I had with the networking setup, but the reader is referred to discussions similar to Why Vagrant? or Why should I use Vagrant instead of just VirtualBox? The real appeal of Vagrant in the end for me was that it can seamlessly work with other VM providers such as AWS, VMWARE via plugins, so the same config files/scripts can be reused simply by changing the provider name. Carving resources out of my laptop to build VMs is fine here for getting the mechanics down, but it is not going to give performant cluster!

3. Vagrant

Having spent a lot of words trying to get here, we jump right in with no further ado. We prepare a text file by the name ‘Vagrantfile’ with high-level details on the cluster we are going to build. Running at the command prompt will generate a sample file that can be edited to our liking. Here is how our file looks like to meet our requirements laid out in section 2.


# -*- mode: ruby -*-
# vi: set ft=ruby :
nguests = 2
box = "hashicorp/precise64"
memory = 8256/nguests # memory per box in MB
cpuCap = 100/nguests
ipAddressStart = ''
Vagrant.configure("2") do |config| (1..nguests).each do |i| hostname = 'guest' + i.to_s ipAddress = ipAddressStart + i.to_s config.vm.define hostname do |hostconfig| = box hostconfig.vm.hostname = hostname :private_network, ip: ipAddress hostconfig.vm.provider :virtualbox do |vb| vb.customize ["modifyvm", :id, "--cpuexecutioncap", cpuCap, "--memory", memory.to_s] end hostconfig.vm.provision :shell, path: "scripts/", args: [nguests, i, memory, ipAddressStart] end end

It is a Ruby script but one does not need to know a lot of Ruby, which I did not. Here is a quick rundown on what it does.

  • We want to set up a 2 node cluster (Line #3).
  • We choose as the OS image on each. Vagrant downloads it if that image has not already been downloaded before to the local repo (‘The Default Machine Folder’ for VirtualBox) (Line #4)
  • My laptop has 16gb RAM and I want to leave 8gb for the host at all times. The rest is divided equally among the guests. Likewise the guests are limited to a fractional use of the CPU. (Lines 5, 6)
  • We loop over each guest:
    • Setting its image (#13), and name (#14).
    • We choose a ‘private_network‘ mode and set the IP address ( # 15). This gives us the network model we wanted in Section 2.
    • Line # 19, is about provisioning the VM with tools, and apps. Extremely powerful & handy. We can automate the process of bringing up each member of the cluster with just the apps we want that guest to be responsible for. No need to ssh to each guest and go through separate installs — a great time saver! Besides simple shell scripts Vagrant allows for other mechanisms like Docker, Chef, Ansible, Puppet, etc., for provisioning process. Here we use a shell script ‘’ to which we pass the arguments we need, to set up Elasticsearch.

That is all for Vagrant, really. The rest is all good old shell scripting at which we are old hands — fabulous! Once the scripts are ready, we run to have the cluster come up, do our work and run to power the cluster down. Until we run the cluster will retain its apps/config/data so we can run anytime to use the cluster and its services.

4. Provisioning Elasticsearch

This is fairly straightforward. The key thing to know is that Vagrant automatically enables one shared directory between the host & guests. That is the directory where the file ‘Vagrantfile’ is located. On the guests, this directory is accessed as ‘/vagrant’. So if we have file ‘a/b/c/some_file’ at the location where ‘Vagrantfile’ is on the host, that ‘some_file’ can be accessed on the guest as ‘/vagrant/a/b/c/some_file’. We use this feature to share pre-downloaded packages we need to install on guests, and any scripts we want to run, post boot time. The  script is as follows.

#!/usr/bin/env bash nguests=$1
ipAddressStart=$4 # Install some utilities that we will need
apt-get -y install unzip
apt-get -y install curl # Install java
mkdir -p /opt/software/java
cd /opt/software/java ; tar zxvf /vagrant/tools/jdk-8u65-linux-x64.tar.gz # Install & Start up elasticsearch
/vagrant/scripts/ $nguests $guestNumber $memory $ipAddressStart

We install some utilities we will need in lines #9 and #10. Install java from the shared location in lines #13 and #14. Finally we run the script below to install Elasticsearch in line #17. 

#!/usr/bin/env bash usage="Usage: nguests thisguest memory ipAddressStart. Need the number of guests in the cluster, this guest number, es-heap memory in MB like 2048m, and startingIp like if clustered ... "
# Install Elastic, Configure & Start
function setUnicastHosts() { local unicast_guests=" [" for i in $(seq 1 $nguests); do unicast_guests+='"guest-es'$i unicast_guests+=':9310"' if [ "$i" -ne "$nguests" ]; then unicast_guests+=',' fi done unicast_guests+=']' echo "$unicast_guests"
# Add to /etc/hosts for convenience & restart networking...
function setEtcHosts() { guest_list="" for i in $(seq 1 $nguests); do guest_list+=$ipAddressStart$i' guest-es'$i$'\n' done echo "$guest_list" > guests_to_be_added cat /etc/hosts guests_to_be_added > tmp ; mv tmp /etc/hosts /etc/init.d/networking restart
if [ "$#" -eq 4 ]; then nguests=$1 thisguest=$2 memory=$(expr $3 / 2) memory+="m" ES_HEAP_SIZE=$memory ipAddressStart=$4 ES_HOME=/opt/software/elasticsearch/elasticsearch-1.7.2 mkdir -p /opt/software/elasticsearch cd /opt/software/elasticsearch ; unzip /vagrant/tools/ cp /vagrant/elastic/ $ES_HOME cp /vagrant/elastic/ $ES_HOME cp /vagrant/elastic/elasticsearch.yml $ES_HOME/config guest_name="guest-es"$thisguest node_name=$guest_name"-node1" unicast_guests=$(setUnicastHosts) if [ "$thisguest" -eq 1 ]; then mkdir -p $ES_HOME/plugins/kopf cd $ES_HOME/plugins/kopf ; tar zxvf /vagrant/elastic/kopf.tar.gz fi perl -0777 -pi -e "s|ES_HOME=/opt/elasticsearch|ES_HOME=$ES_HOME|" $ES_HOME/ perl -0777 -pi -e "s/ES_HEAP_SIZE=2g/ES_HEAP_SIZE=$memory/" $ES_HOME/ perl -0777 -pi -e "s/host_name=localhost/host_name=$guest_name/" $ES_HOME/ perl -0777 -pi -e "s/host_name=localhost/host_name=$guest_name/" $ES_HOME/ perl -0777 -pi -e "s/node_name=node0/node_name=$node_name/" $ES_HOME/ perl -0777 -pi -e "s/$/\n$unicast_guests/" $ES_HOME/config/elasticsearch.yml
else echo $usage exit 1

An Elasticsearch node is a running instance of Elasticsearch, and a  server can run multiple instances – resources permitting of course. All the nodes that are part of a cluster have the same ‘’. Starting with some boiler-plate configuration files that are shared between the host & guests, the script above modifies them based on the arguments passed to each guest during provisioning. The file ‘config/Elasticsearch.yml’ for all guest nodes will be augmented with a list of all members of the cluster. ["guest-es1:9310","guest-es2:9310"]

The function setEtcHosts  appends

  • guest-es1

  • guest-es2

to ‘/etc/hosts’ file on each guest and restarts the network. The script  below prepared for ‘guest2’ runs the following command to start up the  Elasticsearch node ‘guest-es2-node1’. 

/opt/software/elasticsearch/elasticsearch-1.7.2/bin/elasticsearch -d -Des.http.port=9210 -Des.transport.tcp.port=9310 -Des.path.logs=/opt/software/elasticsearch/elasticsearch-1.7.2/logs -Des.path.plugins=/opt/software/elasticsearch/elasticsearch-1.7.2/plugins -Des.path.conf=/opt/software/elasticsearch/elasticsearch-1.7.2/config -p /opt/software/elasticsearch/elasticsearch-1.7.2/pid

where ‘es-dev’ is the name of the cluster we are building. The command on ‘guest1’ to start ‘guest-es1-node1’ would be identical to the above, except for replacing ‘es2’ with ‘es1’. 

We fire up our virtual elastic cluster simply by running  vagrant up  . Because we have installed the ‘kopf’ plugin on ‘guest1’ during provisioning, we can verify that the cluster is up, accessible from the host & ready to be put to work.

Image title

We shut the cluster off by running  vagrant halt  . Whenever we are ready to work with it again from the host we simply run  vagrant up  and the cluster will be back up. Success! We have put in place a mechanism to bring up Elasticsearch as a service, as needed on a virtual cluster.

That is all for this post. In future posts, we will look at extending this to create appliances on AWS so we can do real work.

Original Link

ELK Stack With Vagrant and Ansible

I had been playing with ELK on a routine basis, so, for what I thought to be a quick win, I decided to add to my earlier blog post on Building Elasticsearch clusters with Vagrant. Well, it did not quite turn out that way and I had to cover a good bit of ground and publish code to other repos in order for this blog to be useful.

To recap, that post used (a) Virtualbox as the means to build the VMs for the cluster, and (b) a shell script to orchestrate the installation and configuration of an Elasticsearch cluster on those VMs. In this post, we will still use Virtual Box for giving us the VMs, but enhance the provisioning in two ways.

  1. We will build a full ELK stack where application logs are shipped by Beats to a Logstash host for grokking and posting to an ES cluster hooked to Kibana for querying and dashboards. Here is a schematic.
  2. The provisioning (install and config) of the software for each of E (Elasticsearch), L (Logstash), K (Kibana), and Filebeat plugin is done via Ansible playbooks. Why? While provisioning with shell scripts is very handy, it is programmatic and can make building complex coupled software systems across a cluster of hosts too complicated. Ansible hides much of that and instead presents more or a less a declarative way (playbooks!) of orchestrating the provisioning. While there are alternatives, Ansible has become insanely popular lately in the DevOps world.

You can download the code from GitHub to play along with the build out.

1. The Inventory

We need 7 VMs – 2 for applications with Filebeat, 1 ES master node, 2 ES data nodes, and 1 each for Logstash, and Kibana. The names and IP addresses for these VMs will be needed both by Vagrant for creating these and, later, by Ansible for provisioning. So we prepare a single inventory file and use it with both Vagrant and Ansible. Further, this file rations the cpu/memory resources on my 8-core, 16GB memory laptop across these 7 Vms. The file is simply YAML that is processed in Ruby by Vagrant and in Python by Ansible. Our inventory.yml file looks like:

es-master-nodes: hosts: es-master-1: # hostname ansible_host: # ip address ansible_user: vagrant memory: 2048 # ram to be assigned in MB ansible_ssh_private_key_file: .vagrant/machines/es-master-1/virtualbox/private_key es-data-nodes: hosts: es-data-1: ansible_host: ansible_user: vagrant memory: 2048 ansible_ssh_private_key_file: .vagrant/machines/es-data-1/virtualbox/private_key es-data-2: ansible_host: ansible_user: vagrant memory: 2048 ansible_ssh_private_key_file: .vagrant/machines/es-data-2/virtualbox/private_key kibana-nodes: hosts: kibana-1: ansible_host: ansible_user: vagrant memory: 512 ansible_ssh_private_key_file: .vagrant/machines/kibana-1/virtualbox/private_key logstash-nodes: hosts: logstash-1: ansible_host: ansible_user: vagrant memory: 1536 ansible_ssh_private_key_file: .vagrant/machines/logstash-1/virtualbox/private_key filebeat-nodes: hosts: filebeat-1: ansible_host: ansible_user: vagrant memory: 512 ansible_ssh_private_key_file: .vagrant/machines/filebeat-1/virtualbox/private_key filebeat-2: ansible_host: ansible_user: vagrant memory: 512 ansible_ssh_private_key_file: .vagrant/machines/filebeat-2/virtualbox/private_key

2. The Vagrantfile

The Vagrantfile below builds each of the 7 VMs as per the specs in the inventory.

require 'rbconfig'
require 'yaml' DEFAULT_BASE_BOX = "bento/ubuntu-16.04"
cpuCap = 10 # Limit to 10% of the cpu
inventory = YAML.load_file("inventory.yml") # Get the names & ip addresses for the guest hosts
VAGRANTFILE_API_VERSION = '2' Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vbguest.auto_update = false inventory.each do |group, groupHosts| next if (group == "justLocal") groupHosts['hosts'].each do |hostName, hostInfo| config.vm.define hostName do |node| = hostInfo['box'] ||= DEFAULT_BASE_BOX node.vm.hostname = hostName # Set the hostname :private_network, ip: hostInfo['ansible_host'] # Set the IP address ram = hostInfo['memory'] # Set the memory node.vm.provider :virtualbox do |vb| = hostName vb.customize ["modifyvm", :id, "--cpuexecutioncap", cpuCap, "--memory", ram.to_s] end end end end

The VMs are created simply with the vagrant up --no-provision command and the cluster is provisioned with Ansible.

3. The Playbook

The main playbook will simply delegate the specific app provisioning to roles while overriding some defaults as needed. We override the port variables in the main playbook so we can see they match up as per our schematic for the cluster. Some other variables are overridden in group_vars/* files to keep them from cluttering the main playbook. The cluster is provisioned with ansible-playbook -i inventory.yml elk.yml where elk.yml is the file below. 

- hosts: es-master-nodes become: true roles: - { role: elastic.elasticsearch, cluster_http_port: 9201, cluster_transport_tcp_port: 9301} - hosts: es-data-nodes become: true roles: - { role: elastic.elasticsearch, cluster_http_port: 9201, cluster_transport_tcp_port: 9301} - hosts: kibana-nodes become: true roles: - { role: ashokc.logstash, kibana_server_port: 5601, cluster_http_port: 9201 } - hosts: logstash-nodes become: true roles: - { role: ashokc.logstash, cluster_http_port: 9201, filebeat_2_logstash_port: 5044 } - hosts: filebeat-nodes become: true roles: - {role: ashokc.filebeat, filebeat_2_logstash_port: 5044 }

The directory layout shows a glimpse of all that is under the hood.

├── elk.yml
├── group_vars
│ ├── all.yml
│ ├── es-data-nodes.json
│ ├── es-master-nodes.json
│ ├── filebeat-nodes.yml
│ ├── kibana-nodes.yml
│ └── logstash-nodes.yml
├── inventory.yml
├── roles
│ ├── ashokc.filebeat
│ ├── ashokc.kibana
│ ├── ashokc.logstash
│ └── elastic.elasticsearch
└── Vagrantfile

Common variables for all the host groups are specified in groups_vars/all.yml. The variable ‘public_iface‘ can vary depending on the VM provider. For vagrant here, it is “eth1.” We use that to pull out the IP address of the host from ansible_facts whenever it’s required in the playbook. The file groups_vars/all.yml, in our case, will be:

public_iface: eth1 # For Vagrant Provider
elk_version: 5.6.1
es_major_version: 5.x
es_version: "{{ elk_version }}"
es_apt_url: deb{{ es_major_version }}/apt stable main

3.1 Elasticsearch

The provisioning of Elasticsearch on master and data nodes is delegated to the excellent role elastic.elasticsearch published by As the role allows for multiple instances of ES on a host, we name the instances, “{{cluster_http_port}}_{{cluster_transport_port}}” which would be a unique identifier. The ES cluster itself is taken to be defined by this pair of ports that are used by all the master/data members of the cluster. If we rerun the playbook with a separate pair, say 9202 and 9302, we will get a second cluster, ‘9202_9302’ (in addition to ‘9201_9301’ that we get here on the first run) on the same set of hosts, and all would work fine.

The master node configuration variables are in the file group_vars/es-master-nodes.json shown below. The key useful thing here are the lines 5, 13, and 14, where we derive the “” and “” settings for Elasticsearch from the information in the inventory file.

{ "es_java_install" : true, "es_api_port": "{{cluster_http_port}}", "es_instance_name" : "{{cluster_http_port}}_{{cluster_transport_tcp_port}}", "masterHosts_transport" : "{% for host in groups['es-master-nodes'] %} {{hostvars[host]['ansible_'+public_iface]['ipv4']['address'] }}:{{cluster_trans
port_tcp_port}}{%endfor %}", "es_config": { "": "{{es_instance_name}}", "http.port": "{{cluster_http_port}}", "transport.tcp.port": "{{cluster_transport_tcp_port}}", "node.master": true, "": false, "": ["{{ hostvars[inventory_hostname]['ansible_' + public_iface]['ipv4']['address'] }}","_local_" ], "" : "{{ masterHosts_transport.split() }}" }

The data node configuration variables are very similar in the file group_vars/es-data-nodes.json below. The lines 2, 12, and 13 show the only changes.

{ "es_data_dirs" : "/opt/elasticsearch", "es_java_install" : true, "es_api_port": "{{cluster_http_port}}", "es_instance_name" : "{{cluster_http_port}}_{{cluster_transport_tcp_port}}", "masterHosts_transport" : "{% for host in groups['es-master-nodes'] %} {{hostvars[host]['ansible_'+public_iface]['ipv4']['address'] }}:{{cluster_trans
port_tcp_port}}{%endfor %}", "es_config": { "": "{{es_instance_name}}", "http.port": "{{cluster_http_port}}", "transport.tcp.port": "{{cluster_transport_tcp_port}}", "node.master": false, "": true, "": ["{{ hostvars[inventory_hostname]['ansible_' + public_iface]['ipv4']['address'] }}","_local_" ], "" : "{{ masterHosts_transport.split() }}" }

3.2 Logstash

Logstash is provisioned with the role ashokc.logstash. The default variables for this role are overridden with group_vars/logstash-nodes.yml. Lines 4-5 specify the user and group that own this instance of logstash. Lines 9 and 10 derive the Elasticsearch URLs from the inventory file. It will be used for configuring Elasticsearch output sections.


es_java_install: True
update_java: False
logstash_version: "{{ elk_version }}"
logstash_user: logstashUser
logstash_group: logstashGroup
logstash_enabled_on_boot: yes
logstash_install_plugins: - logstash-input-beats
esMasterHosts: "{% for host in groups['es-master-nodes'] %} http://{{hostvars[host]['ansible_'+public_iface]['ipv4']['address'] }}:{{cluster_http_port}}
{% endfor %}"
logstash_es_urls : "{{ esMasterHosts.split() }}"

A simple Elasticsearch output config and filebeat input config are enabled with the following.


output { elasticsearch { hosts => {{ logstash_es_urls | to_json }} }


input { beats { port => {{filebeat_2_logstash_port}} }

3.3 Kibana

Kibana is provisioned with the role ashokc.kibana. The default variables for this role are again overridden with group_vars/kibana-nodes.yml. Unlike logstash, it is quite common to run multiple Kibana servers on a single host with each instance targeting a separate ES cluster. This role allows for that and identifies the Kibana instance with the port it is running at (Line # 7). Lines 2 and 3 specify the owner/group for the instance.


kibana_version: "{{ elk_version }}"
kibana_user: kibanaUser
kibana_group: kibanaGroup
kibana_enabled_on_boot: yes
kibana_elasticsearch_url : http://{{hostvars[groups['es-master-nodes'][0]]['ansible_'+public_iface]['ipv4']['address'] }}:{{cluster_http_port}}
kibana_instance: "{{kibana_server_port}}"

The template file for ‘kibana.yml ‘ below picks up the correct Elasticsearch cluster URL from below 


server.port: {{ kibana_server_port }} {{ kibana_server_host }}
elasticsearch.url: {{ kibana_elasticsearch_url }}
pid.file: {{ kibana_pid_file }}
logging.dest: {{ kibana_log_file }}

3.4 Filebeat

Filebeat is provisioned with the role ashokc.filebeat The default variables are overridden in groups_vars/filebeat-nodes.yml below. Lines 5 and 7 figure out the logstash connection to use.

filebeat_version: "{{ elk_version }}"
filebeat_enabled_on_boot: yes
filebeat_user: filebeatUser
filebeat_group: filebeatGroup
logstashHostsList: "{% for host in groups['logstash-nodes'] %} {{hostvars[host]['ansible_'+public_iface]['ipv4']['address'] }}:{{filebeat_2_logstash_por
t}}{% endfor %}"
filebeat_logstash_hosts: "{{ logstashHostsList.split() }}"

Line #14 in the template for the sample filebeat.yml below configures the output to our logstash host at the right port.

- type: log enabled: true paths: - /tmp/custom.log fields: log_type: custom type: {{ansible_hostname}} from: beats multiline.pattern: '^\s[+]{2}\scontinuing .*' multiline.match: after
output.logstash: hosts: {{ filebeat_logstash_hosts | to_nice_yaml }}

4. Logs

The last step would be to run an application on the filebeat nodes and watch the logs flow into Kibana. Our application would simply be a Perl script that writes the log file /tmp/custom.log. We log in to each of the filebeat hosts and run the following Perl script.

#!/usr/bin/perl -w
use strict ;
no warnings 'once';
my @codes = qw (fatal error warning info debug trace) ;
open(my $fh, ">>", "/tmp/custom.log") ;
my $now = time();
for my $i (1 .. 100) { my $message0 = "Type: CustomLog: This is a generic message # $i for testing ELK" ; my $nDays = int(rand(5)) ; my $nHrs = int(rand(24)) ; my $nMins = int(rand(60)) ; my $nSecs = int(rand(60)) ; my $timeValue = $now - $nDays * 86400 - $nHrs * 3600 - $nMins * 60 - $nSecs ; my $now1 = localtime($timeValue) ; my $nMulti = int(rand(10)) ; my $message = "$now1 $nDays:$nHrs:$nMins:$nSecs $nMulti:$codes[int(rand($#codes))] $message0" ; if ($nMulti > 0) { for my $line (1 .. $nMulti) { $message = $message . "\n ++ continuing the previous line for this log error..." } } print $fh "$message\n" ;
close $fh ;

The corresponding sample logstash config file for processing this log would be placed at roles/ashokc.logstash/files/custom-filter.conf

filter { if [fields][log_type] == "custom" { grok { match => [ "message", "(?<matched-timestamp>\w{3}\s+\w{3}\s+\d{1,2}\s+\d{1,2}:\d{1,2}:\d{1,2}\s+\d{4})\s+(?<nDays>\d{1,3}):(?<nHrs>\d{1,2}):(?<nMi
ns>\d{1,2}):(?<nSecs>\d{1,2})\s+(?<nLines>\d{1,2}):(?<code>\w+) Type: (?<given-type>\w+):[^#]+# (?<messageId>\d+)\s+%{GREEDYDATA}" ] add_tag => ["grokked"] add_field => { "foo_%{nDays}" => "Hello world, from %{nHrs}" } } mutate { gsub => ["message", "ELK", "BULK"] } date { match => [ "timestamp" , "EEE MMM d H:m:s Y", "EEE MMM d H:m:s Y" ] add_tag => ["dated"] } }


By placing appropriate filter files for logstash at roles/ashokc.logstash/files and prospector config file for filebeat at roles/ashokc.filebeat/templates/filebeat.yml.j2, one can use this ELK stack to analyze application logs. A variety of extensions are possible, for example enabling X-PACK login/security, other distributions and versions for ‘ashokc’ roles, automated testing etc… But then there is always more to be done, isn’t there?

Original Link

Elasticsearch Setup and Configuration

What Is Elasticsearch?

Elasticsearch is highly scalable, broadly distributed open-source full-text search and analytics engine. You can, in very near real-time search, store and index big volumes of data. It internally uses Apache Lucene for indexing and storing data. Below are few use cases for it.

  • Product search for e-commerce websites.
  • Collecting application logs and transaction data for analyzing it for trends and anomalies.
  • Indexing instance metrics (health, stats) and doing analytics, creating alerts for instance health at regular intervals.
  • For analytics/business-intelligence applications.

Elasticsearch Basic Concepts

We will be using a few terminologies while talking about Elasticsearch. Let’s look at the basic building blocks of Elasticsearch.

Near Real-Time

Elasticsearch is near real-time. This describes the time (latency) between the indexing of a document and its availability for searching.


It is a collection of one or multiple nodes (servers) that together hold all the data and provide you the ability to index and search that cluster for data.


It is a single server that is part of your cluster. It can store data, participate in indexing and searching and overall cluster management. A node could have four different flavors, i.e. master, HTTP, data, coordinating/client nodes.


An index is a collection of similar kind/characteristics of documents. It is identified by name (all lowercase) and is referred to by name to perform indexing, searching, and update and delete operations against documents.


It is a single unit of information that can be indexed.

Shards and Replicas

A single index can store billions of documents which can lead to storage taking up TBs of space. A single server could exceed its limitations to store a massive amount of information or perform a search operation on that data. To solve this problem, Elasticsearch sub-divides your index into multiple units called shards.

Replication is important primarily to have high availability in case of node/shard failure and to allow you to scale out your search throughput. By default, Elasticsearch has 5 shards and 1 replica, which could be configured at the time of creating an index.

Installing Elasticsearch

Elasticsearch requires Java to run. As of writing this article, Elasticsearch 6.2.X+ requires at least Java 8.

Installing Java 8

#Installing Open JDK
sudo apt-get install openjdk-8-jdk #Installing Oracle JDK
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer

Installing Elasticsearch with a tar file

curl -L -O tar -xvf elasticsearch-6.2.4.tar.gz

Installing Elasticsearch with a package manager

#import the Elasticsearch public GPG key into apt:
wget -qO - | sudo apt-key add - #Create the Elasticsearch source list
echo "deb stable main" | sudo tee -a /etc/apt/sources.list.d/elasticsearch-6.x.list sudo apt-get update sudo apt-get -y install elasticsearch

Configuring an Elasticsearch Cluster

Configuration file location if you have downloaded the tar file:

vi /[YOUR_TAR_LOCATION]/config/elasticsearch.yml

Configuration file location if you used a package manager to install Elasticsearch:

vi /etc/elasticsearch/elasticsearch.yml

Cluster Name

Use some descriptive name for the cluster. Elasticsearch nodes will use this name to form and join a cluster. lineofcode-prod

Node name

To uniquely identify a node in the cluster: ${HOSTNAME}

Custom attributes for a node

Adding a rack to a node to logically group the nodes placed on same data center/physical machine:

node.attr.rack: us-east-1

Network host

A node will bind to this hostname or IP address and advertise this host to other nodes in the cluster. [_VPN_HOST_, _local_]

Elasticsearch does not come with authentication and authorization. So, it is suggested to never bind a network host property to the public IP address.

Cluster finding settings

To find and join a cluster, you need to know at least a few other hostname or IP addresses. This could easily be set by the property.

Changing the HTTP port

You can configure the port number on which Elasticsearch is accessible over HTTP with http.port property.

Configuring JVM Options (Optional for Local/Test)

You need to tweak JVM options as per your hardware configuration. It is advisable to allocate half the memory of the total server’s available memory to Elasticsearch and the rest will be taken up by Lucene and Elasticsearch threads.

#For example if your server have eight GB of RAM then set following property as

Also, to avoid performance hits, let Elasticsearch block the memory with thebootstrap.memory_lock: true property.

Elasticsearch uses concurrent mark and sweep GC and you can change it to G1GC with the following configurations.


Starting Elasticsearch

sudo service elasticsearch restart

Tada! Elasticsearch is up and running on your local machine.

To have a production-grade setup, I would recommend visiting following articles.

Digitalocean guide to set up production Elasticsearch

Elasticsearch – Fred Thoughts

Original Link

#BREAKING in Big Data

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

It’s time for another Big Data Zone monthly round-up! Last time, we focused on the basics of big data. In this article, we’ll keep it simple and cover everything you need to know about big data from the past month or so, including the top big data-related articles on DZone that you should check out, the top news that happened in the world of big data, publicatiosn related to big data, and more. Let’s get started!

DZone Big Data Bonanza

These are the top five articles from the Big Data Zone from the past month. Check them out so you’re up-to-date on the latest and greatest in big data on DZone!

  1. Top 5 Hadoop Courses to Learn Online by Javin Paul. Having a good knowledge of Hadoop will go a long way in boosting your career prospects, especially if you’re interested in big data. This article can help you get started.

  2. Collecting Elasticsearch Directly to Your Java EE Application by Otavio Santana. See how to join the best of the Java EE and NoSQL worlds using Eclipse JNoSQL.

  3. Synchronous Kafka: Using Spring Request-Reply by Gaurav Gupta. With the latest release of Spring-Kafka, request-reply semantics are available off-the-shelf. This example demonstrates the simplicity of the Spring-Kafka implementation.

  4. Python vs. R by Mike LaFleur. When they emerged in the ’90s, Python and R gave data scientists an immense amount of power to operationalize risk models, and created the Python vs. R debate that’s still argued 30 years later.

  5. Graph Algorithms in Neo4j: 15 Different Graph Algorithms and What They Do by Amy Hodler. If you’re using Neo4j, then you need to be familiar with these powerful graph algorithms. Who doesn’t want to make their job easier?

PS: Are you interested in contributing to DZone? It’s easier than ever! Check out our brand new Bounty Board, where you can apply for specific writing prompts and win prizes! 

Browsing Big Data

  1. The Best Big Data Companies and CEOs to Work for in 2018 Based on Glassdoor (Forbes). Learn about the companies that employees would most recommend to friends interested in working for business analytics, data science, machine learning, big data systems and platforms, or data management and integration.

  2. Navy to Focus on Big Data Analytics, Artificial Intelligence (The Economic Times). Top commanders of the Navy have finalized a plan to incorporate big data analytics and AI into its operational functioning. Check out this news article to learn more.

  3. Big Data and a Mathematical Formula for Great Chocolate Chip Cookies (The Spoon). Through years of extensive research, big data analytics, and many, many spreadsheets, Michael Ohene has come up with the magic chocolate chip cookie formula.

Dive Deeper Into Big Data

  1. The DZone Guide to Big Data: Data Science and Advanced Analytics. Explore the critical capabilities in next-generation self-service data preparation tools and dive deep into applications and languages affiliated with big data.

  2. Recommendations Using Redis. Learn how to develop a simple recommendation system with Redis, how to use commands, and how to optimize your system for real-time recommendations in production.

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.


big data ,data analytics ,ai ,python ,r ,kafka ,elasticsearch ,graph algorithms ,hadoop

Original Link

Kafka Streams: Catching Data in the Act (Part 3)

In the previous post, we designed the experiment, simulated different operational states, and confirmed that the results were as expected (more or less). Here, we go over the implementation and a few relevant code snippets before wrapping up this series of posts. As usual, the package is available for download at GitHub.

We start with the pom.

The pom

This is mostly straightforward. The relevant XML snippets are as follows. We use Elasticsearch (6.1.2) for long-term storage.

<!-- Kafka packages for producers, consumers and streams --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version>
</dependency> <!-- Kafka Avro serializers/deserializers -->
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>4.0.0</version>
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-streams-avro-serde</artifactId> <version>4.0.0</version>
</dependency> <!-- Avro Libraries -->
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version>
</dependency> <!-- Avro code generation -->
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version>
</dependency> <!-- Elasticsearch for long term storage-->
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.1.2</version>
</dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.1.2</version>


Producers are the sensors at the vertices A, B, and C. Each sensor produces to an assigned partition: A => 0, B => 1, and C => 2. The sensors throw off raw measurements at a maximum rate of about 1/sec as per the following code snippet running on a thread in a loop until interrupted. Lines 5-6 update the position of the vertex. Angular velocity is one revolution per minute (2 PI/60). Line 7 computes the RawVertex object as per the AVRO spec. Lines 8-9 push the object to the rawVertex topic and wait for the full acknowledgment.

while ( !(Thread.currentThread().isInterrupted()) ) { String key = clientId ; long currentTime = System.currentTimeMillis() ; double rand = -error + random.nextDouble() * 2.0 * error ; valX = valX + amplitude * Math.sin(angularV * (currentTime - timePrev) * 0.001) * rand ; valY = valY + amplitude * Math.cos(angularV * (currentTime - timePrev) * 0.001) * rand ; rawVertex = new RawVertex (clientId, currentTime, valX, valY) ; ProducerRecord<String, RawVertex> record = new ProducerRecord<>(topic, partitionNumber, key, rawVertex) ; RecordMetadata metadata = producer.send(record).get(); timePrev = currentTime ; Thread.sleep(1000) ;

The important producer configurations in Lines 5 and 6 indicate the KafkaAvroSerializers, and Line 7 is the URL to find/register the schemas.

producerProps.setProperty("acks", "all") ;
producerProps.setProperty("retries", "0") ;
producerProps.setProperty("", "1") ;
producerProps.setProperty("bootstrap.servers","localhost:9092") ;
producerProps.setProperty("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer") ;
producerProps.setProperty("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer") ;
producerProps.setProperty("schema.registry.url","http://localhost:8081") ;

The three producers are started off at the same time via a script (we will see this at the end of the post) that supplies the initial positions of the vertices.

Stream Processing

The VertexProcessor and TriangleProcessor work in concert to enable stress metrics on the triangle. A Kafka Stream is a consumer and/or producer as needed for receiving messages from upstream entities and producing messages to downstream entities. Thus, the serialization (for production) and deserialization (for consumption) methodologies need to be defined and be in place before a stream topology can be instantiated. Further, as the processors may employ a variety of stores (key-value and window in our case here), the means to build these stores should be in place, as well.

streamProcess.streamProps.setProperty("","3") ;
streamProcess.streamProps.setProperty("processing.guarantee","at_least_once") ;
streamProcess.streamProps.setProperty("","100") ;
streamProcess.streamProps.setProperty("default.timestamp.extractor", StreamTimeStampExtractor.class.getName());

A few key stream properties are shown above. We employ three stream threads (Line 1) to match the three rawVertex partition sources we have. Line 4 defines the Stream Time, the key quantity that makes the stream move or not. This is the actual measurement time in case of the rawVertex and the average of those in case of the smoothedVertex. The code snippet below shows the custom class that extends TimestampExtractor. Lines 8 and 12 extract the measurement time as the stream time.

public class StreamTimeStampExtractor implements TimestampExtractor { public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) { long timeStamp = System.currentTimeMillis() ; Object obj = record.value() ; if (obj != null) { if (obj instanceof RawVertex) { RawVertex rv = (RawVertex) obj ; timeStamp = rv.getTimeInstant() ; } else if (obj instanceof SmoothedVertex) { SmoothedVertex sv = (SmoothedVertex) obj ; timeStamp = sv.getTimeInstant() ; } } return timeStamp ; }

Serialization and Deserialization

The messages in our cases have Strings/Longs as the keys but Avro objects as the values. The following snippet of code configures and returns a serializer/deserializer that we will need for the values (Lines 6 and 15 below).

KafkaAvroDeserializer getKafkaAvroDeserializer() { Map<String, String> kadsConfig = new HashMap<String, String>() ; kadsConfig.put("schema.registry.url", "http://localhost:8081"); kadsConfig.put("specific.avro.reader", "true") ; KafkaAvroDeserializer kads = new KafkaAvroDeserializer() ; kads.configure(kadsConfig, false) ; // false means NOT FOR the key return kads ;
} KafkaAvroSerializer getKafkaAvroSerializer() { Map<String, String> kasConfig = new HashMap<String, String>() ; kasConfig.put("schema.registry.url", "http://localhost:8081"); kasConfig.put("specific.avro.reader", "true") ; KafkaAvroSerializer kas = new KafkaAvroSerializer() ; kas.configure(kasConfig, false) ; // false means NOT for the key return kas ;

Querying the schema registry shows the String and the RawVertex and SmoothedVertex schemas we defined via avsc files in the data model.

curl -X GET http://localhost:8081/subjects
["rawVertex-key","triangle-stress-monitor-rawVertexKVStateStore-changelog-value","triangle-stress-monitor-smoothedVerticesWindowStateStore-changelog-value","smoothedVertex-value","rawVertex-value","triangle-stress-monitor-smoothedVertexKVStateStore-changelog-value"] curl -X GET http://localhost:8081/schemas/ids/1
{"schema":"\"string\""} curl -X GET http://localhost:8081/schemas/ids/2
{"schema":"{\"type\":\"record\",\"name\":\"RawVertex\",\"namespace\":\"com.xplordat.rtmonitoring.avro\",\"fields\":[{\"name\":\"sensor\",\"type\":{\"type\":\"string\",\"\":\"String\"}},{\"name\":\"timeInstant\",\"type\":\"long\"},{\"name\":\"X\",\"type\":\"double\"},{\"name\":\"Y\",\"type\":\"double\"}]}"} curl -X GET http://localhost:8081/schemas/ids/3

The Stores

The VertexProcessor employs two local stores for resiliency. One of them holds (Line 10 below) the current batch of raw vertices being smoothed. In case the processor dies, the restarted processor will read the current state from this store. The other (Line 11) will have the previously smoothed vertex object, which is needed for computing cumulative displacement of the vertex. The window store used by the TriangleProcessor is defined in Line 14, with two parameters: the length of the time window and its retention time. The right values for these depend on the overall beat of the apparatus — the production rate, expected delays in transit, a reasonable duration to average the measurements over, etc. that we discussed at length in the previous post.

Map<String, String> schemaConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); Serde<RawVertex> rawVertexAvroSerde = new SpecificAvroSerde<>() ;
rawVertexAvroSerde.configure(schemaConfig, false) ; // This "false" is for the variable "final boolean isSerdeForRecordKeys" Serde<SmoothedVertex> smoothedVertexAvroSerde = new SpecificAvroSerde<>() ;
smoothedVertexAvroSerde.configure(schemaConfig, false) ; // This "false" is for the variable "final boolean isSerdeForRecordKeys" // Local stores used by VertexProcessor
StoreBuilder<KeyValueStore<Long, RawVertex>> vertexProcessorRawVertexKVStateStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("rawVertexKVStateStore"), Serdes.Long(), rawVertexAvroSerde) ;
StoreBuilder<KeyValueStore<Long, SmoothedVertex>> vertexProcessorSmoothedVertexKVStateStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("smoothedVertexKVStateStore"), Serdes.Long(), smoothedVertexAvroSerde) ; // Local Window Store used by TriangleProcessor
StoreBuilder<WindowStore<String, SmoothedVertex>> triangleProcessorWindowStateStoreBuilder = Stores.windowStoreBuilder(Stores.persistentWindowStore("smoothedVerticesWindowStateStore", TimeUnit.MINUTES.toMillis(windowRetentionMinutes), 3, TimeUnit.SECONDS.toMillis(windowSizeSeconds),false), Serdes.String(), smoothedVertexAvroSerde) ;

The Topology

The process topology described in the earlier posts is materialized by the following code snippet.

Topology topology = new Topology(); topology.addSource("vertexSource", new StringDeserializer(), getKafkaAvroDeserializer(), "rawVertex") .addProcessor("vertexProcessor", () -> new VertexProcessor(smoothingIntervalSeconds, resetStores), "vertexSource") .addStateStore(vertexProcessorRawVertexKVStateStoreBuilder, "vertexProcessor") .addStateStore(vertexProcessorSmoothedVertexKVStateStoreBuilder, "vertexProcessor") .addSink("smoothedVerticesSink", "smoothedVertex", new StringSerializer(), getKafkaAvroSerializer(), "vertexProcessor") .addSource("smoothedVerticesSource", new StringDeserializer(), getKafkaAvroDeserializer(), "smoothedVertex") .addProcessor("triangleProcessor", () -> new TriangleProcessor(windowSizeSeconds, windowRetentionMinutes, resetStores, smoothedIndex, triangeIndex), "smoothedVerticesSource") .addStateStore(triangleProcessorWindowStateStoreBuilder, "triangleProcessor") ;

In Line 3, the data is read off of the rawVertex topic with the configured deserializers (String for key and KafkaAvro for the value that is a RawVertex object). This flows into the VertexProcessor in Line 4 that smooths them over a short time interval before forwarding the average into smoothedVertex topic in Line 7. The smoothedVertex topic serves as the source (Line 8) for the TriangleProcessor. The topology built and executed by Kafka is obtained by showing two sub-topologies as expected. Excellent!

Sub-topology: 0 Source: smoothedVerticesSource (topics: [smoothedVertex]) --> triangleProcessor Processor: triangleProcessor (stores: [smoothedVerticesWindowStateStore]) --> none <-- smoothedVerticesSource
Sub-topology: 1 Source: vertexSource (topics: [rawVertex]) --> vertexProcessor Processor: vertexProcessor (stores: [smoothedVertexKVStateStore, rawVertexKVStateStore]) --> smoothedVerticesSink <-- vertexSource Sink: smoothedVerticesSink (topic: smoothedVertex) <-- vertexProcessor

When starting a new run, we may want to clean up any hold over stores from a previous run. In case of a crash and restart, we should not reset the stores, of course. This is Line 3 below. In case of a failure or interruption, a shutdown hook allows for all the state stores to be closed cleanly in Line 12.

KafkaStreams kstreams = new KafkaStreams(topology, streamProps);
if (resetStores) { kstreams.cleanUp();
kstreams.start() ; Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { logger.error ("Interrupting stream processing...") ; kstreams.close() ; } catch (Exception e) { logger.error ("Errors in shutdownhook..." + e) ; System.exit(1) ; } } }) ;

The Vertex and Triangle processors provide an implementation to take care of any other housekeeping operations. For example, the TriangleProcessor (which we will talk about soon) employs the following snippet to flush the triangle metrics and smoothed vertex measurements to Elasticsearch before exiting.

public void close() { if (smoothedVertexDocs.size() > 0) { indexThis (smoothedVertexDocs, smoothedIndex) ; } if (triangleDocs.size() > 0) { indexThis (triangleDocs, triangleIndex) ; } smoothedVerticesWindowStateStore.close();

The VertexProcessor

The VertexProcessor averages the incoming raw measurements over the smoothing interval and forwards the smoothedVertex to the next stage. The stores are initialized in Lines 3-4 below. In Line 5, the punctuation is scheduled to run periodically at an interval: smoothingInterval (12 seconds in our simulations). The smoothed vertex is computed (Line 14), forwarded downstream (Line 15), and saved to the local store (Line 17). We get ready for a new smoothing batch in Line 16.

public void init(ProcessorContext context) { this.context = context; rawVertexKVStateStore = (KeyValueStore) context.getStateStore("rawVertexKVStateStore"); smoothedVertexKVStateStore = (KeyValueStore) context.getStateStore("smoothedVertexKVStateStore"); context.schedule(smoothingInterval, PunctuationType.STREAM_TIME, (timestamp) -> { if (numberOfSamplesIntheAverage > 0L) { SmoothedVertex prevLsv = smoothedVertexKVStateStore.get(1L) ; double prevX = prevLsv.getX() ; double prevY = prevLsv.getY() ; pushCount = prevLsv.getPushCount() + 1 ; double X = xSum / numberOfSamplesIntheAverage ; double Y = ySum / numberOfSamplesIntheAverage ; double cumulativeDisplacementForThisSensor = prevLsv.getCumulativeDisplacementForThisSensor() + findLength (X, Y, prevX, prevY) ; SmoothedVertex sv = new SmoothedVertex (sensor, timeStart, timeInstant, timeEnd, X, Y, numberOfSamplesIntheAverage, cumulativeDisplacementForThisSensor, pushCount, System.currentTimeMillis()) ; context.forward(sensor, sv) ; // Forward it to "smoothedVertex" topic. startASmoothingBatch = true ; smoothedVertexKVStateStore.put(1L, sv) ; // Update the smoothedVertexKVStateStore context.commit(); } });

In case of failure and restart, we need to load the store from disk and initialize the state (Line 3 below). An iterator over the store in Line 17 provides access to the raw vertex objects that have been pulled off the topic prior to failure but not yet processed into the smoothedVertex topic.

if (!(resetStores)) { // need to restore the state from disk... startASmoothingBatch = false ; restoreFromStore() ;
} void restoreFromStore() { numberOfSamplesIntheAverage = 0L ; KeyValueIterator<Long, RawVertex> iter0 = rawVertexKVStateStore.all(); xSum = 0.0 ; ySum = 0.0 ; while (iter0.hasNext()) { RawVertex rv = ; xSum = xSum + rv.getX() ; ySum = ySum + rv.getY() ; timeStart = Math.min(timeStart, rv.getTimeInstant()) ; timeEnd = Math.max(timeEnd, rv.getTimeInstant()) ; numberOfSamplesIntheAverage++ ; } iter0.close() ;

The TriangleProcessor

The TriangleProcessor gets the forwarded smoothedVertex objects from the VertexProcessors and computes triangle metrics as soon as a window gets at least one of A, B, and C measurements. The metrics are saved to an Elasticsearch index for long-term storage. Line 3 in the code below keeps track of the travel time that we plotted in the previous post. This is the time that a smoothed vertex takes to reach the TriangleProcessor after it has been forwarded by the VertexProcessor. If we are restarting the stream processor after a crash, Line 9 gets engaged and reads in all the available window store data. Line 13 saves the incoming measurement to the window store.

Because the time windows have defined boundaries (first window; [0, windowSize), second window: [windowSize, 2*windowSize), etc.), we know which window an incoming measurement will fall into. This is Line 15, followed by keeping track of this window (Line 13) for saturation time that we want to know. How long a time window needs to be retained will depend on the maximum saturation time that we can expect in a simulation. We talked about this a good bit in the previous post. Every time a window gets a new measurement and it has at least one each of A, B, and C, we compute the triangle metrics. This is Line 26.

public void process (String key, SmoothedVertex value) { long arrivalTime = System.currentTimeMillis() ; long delay = arrivalTime - value.getPushTime() ; long measurementTime = value.getTimeInstant() ; String incomingSensor = value.getSensor() ; if (isThisANewProcessor) { if (!(resetStores)) { restoreFromStore(value.getTimeStart()) ; } isThisANewProcessor = false ; // the above block executes just once upon start up } smoothedVerticesWindowStateStore.put(incomingSensor, value, measurementTime) ; long windowKey = (measurementTime / windowSize) * windowSize ; if (!(windowCounts.containsKey(windowKey))) { initializeWindow (windowKey, arrivalTime) ; } HashMap<String, Long> hm = windowCounts.get(windowKey) ; hm.put(incomingSensor, hm.get(incomingSensor) + 1L) ; hm.put("total", hm.get("total") + 1) ; hm.put("latestArrivalWallClockTime", arrivalTime) ; windowCounts.put(windowKey, hm) ; if ( (hm.get("A") > 0) && (hm.get("B") > 0) && (hm.get("C") > 0) ) { computeTriangleMetrics (windowKey) ; }

In the code snippet below, the vertex coordinates in a window are processed to produce a single value for A, B, and C and at a mean time. Line 3 gets an iterator over all the smoothedVertex objects in a given time window.

ArrayList<SmoothedVertex> getItemsInWindow (String sensor, long timeStart, long timeEnd) { ArrayList<SmoothedVertex> windowedData = new ArrayList<SmoothedVertex>() ; WindowStoreIterator<SmoothedVertex> iter = smoothedVerticesWindowStateStore.fetch(sensor, timeStart, timeEnd) ; while (iter.hasNext()) { windowedData.add( ; } iter.close(); return windowedData ;
} int aCount = 0 ; int bCount = 0 ; int cCount = 0 ; long time = 0L ;
double ax = 0.0 ; double ay = 0.0 ; double bx = 0.0 ; double by = 0.0 ; double cx = 0.0 ; double cy = 0.0 ; double aDisplacement = 0.0 ; double bDisplacement = 0.0 ; double cDisplacement = 0.0 ; for (String sensor: allSensors) { Iterator<SmoothedVertex> itr = getItemsInWindow(sensor, windowTimeStart, windowTimeEnd).iterator() ; while (itr.hasNext()) { SmoothedVertex sv = ; time = time + sv.getTimeInstant() ; if (sensor.equals("A")) { ax = ax + sv.getX() ; ay = ay + sv.getY() ; aDisplacement = aDisplacement + sv.getCumulativeDisplacementForThisSensor() ; aCount++ ; } else if (sensor.equals("B")) { bx = bx + sv.getX() ; by = by + sv.getY() ; bDisplacement = bDisplacement + sv.getCumulativeDisplacementForThisSensor() ; bCount++ ; } else if (sensor.equals("C")) { cx = cx + sv.getX() ; cy = cy + sv.getY() ; cDisplacement = cDisplacement + sv.getCumulativeDisplacementForThisSensor() ; cCount++ ; } }

With a triangle at hand, we proceed forward to compute the metrics and save it to Elasticsearch for long-term storage. Any alerts based on the computed metrics will be injected here but we will not delve into that. Line 12 computes the time gap between the earliest- and latest-arriving measurements in a window so we can get a distribution of this quantity over time and use it to adjust the window retention time.

if ( (aCount >= 1) && (bCount >= 1) && (cCount >= 1) ) { ax = ax / aCount ; ay = ay / aCount ; bx = bx / bCount ; by = by / bCount ; cx = cx / cCount ; cy = cy / cCount ; aDisplacement = aDisplacement / aCount ; bDisplacement = bDisplacement / bCount ; cDisplacement = cDisplacement / cCount ; long timeInstant = time / (aCount + bCount + cCount) ; double totalDisplacement = aDisplacement + bDisplacement + cDisplacement ; double AB = findLength (ax, ay, bx, by) ; double BC = findLength (bx, by, cx, cy) ; double AC = findLength (ax, ay, cx, cy) ; double perimeter = AB + BC + AC ; double halfPerimeter = perimeter * 0.5 ; double area = Math.sqrt (halfPerimeter*(halfPerimeter-AB)*(halfPerimeter-BC)*(halfPerimeter-AC)) ; long wallclock_span = windowCounts.get(windowKey).get("latestArrivalWallClockTime") - windowCounts.get(windowKey).get("earliestArrivalWallClockTime") ;

Running the Simulation

First, the rawVertex topic is created with three partitions and the smoothedVertex topic with one partition.

$KAFKA_HOME/bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic rawVertex $KAFKA_HOME/bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic smoothedVertex

The three producers are started off from a script that provides all the producer configs as arguments. The production parameters are geared (no retries, synchronous send, only one message in flight, full acknowledgment, etc.) toward keeping the message order in the topic to be identical to production order. We count on this for smoothing in the VertexProcessor.

#!/bin/bash function monitor() { clientId=$1 # A or B or C. The names of the Vertices acks=$2 # We use "all" for complete acknowledgement retries=$3 # No retries, as we want ordered delivery enableIdempotence=$4 maxInFlightRequestsPerConnection=$5 # 1 as we want ordered delivery sync=$6 # wait for send action to complete topics=$7 # rawVertex sleepTimeMillis=$8 # 1000 amplitude=$9 # 1
# 2.0 * Math.PI * angularV / 60.0 => It will take 60secs i.e. 1 minute to trace the full circle. => period = 1 min angularV= # 1 error= # 0.001 xReference= # Starting X position for this vertex yReference= # Starting Y position for this vertex java -cp ./core/target/rtmonitoring.jar com.xplordat.rtmonitoring.Producers $clientId $acks $retries $enableIdempotence $maxInFlightRequestsPerConnection $sync $topics $sleepTimeMillis $amplitude $angularV $error $xReference $yReference
} monitor A all 0 false 1 true rawVertex 1000 1.0 1.0 0.001 0.0 0.0 &
monitor B all 0 false 1 true rawVertex 1000 1.0 1.0 0.001 1.0 0.0 &
monitor C all 0 false 1 true rawVertex 1000 1.0 1.0 0.001 0.5 0.866 &

Likewise, the stream processor is started (or restarted) with a script supplying the various config parameters indicated above.

#!/bin/bash startStream=$1
if [ "$startStream" == "start" ]; then resetStores="true"
elif [ "$startStream" == "resume" ]; then resetStores="false"
else echo "Need 1 arg start/resume" exit
fi streamProcess() { applicationId=$1 clientId=$2 nthreads=$3 guarantee=$4 # at_least_once commit_interval_ms=$5 # 100 resetStores=$6 windowSize=$7 # seconds windowRetention=$8 # minutes smoothIngInterval=$9 # seconds smoothedIndex= triangleIndex= java -cp ./core/target/rtmonitoring.jar com.xplordat.rtmonitoring.StreamProcess $applicationId $clientId $nthreads $guarantee $commit_interval_ms $resetStores $windowSize $windowRetention $smoothIngInterval $smoothedIndex $triangleIndex
} streamProcess triangle-stress-monitor 0 3 at_least_once 100 $resetStores 60 360 12 smoothed triangle


With that, we close this post — and this series of posts on using Kafka for streaming analytics. Hopefully, the detail here is all that you need to replicate the simulations in this series. The complete code is available on GitHub.

Original Link

The Basics of Databases

Welcome back to our monthly database series! Last time, we took a look at the biggest database articles and news from the month of March. In this article, we’re going to look at some introductory database articles on DZone, explore the concept of databases elsewhere on the web, and look at some publications related to databases.


Check out some of the top introductory database articles on DZone to understand the basics of databases,

  1. The Types of Modern Databases by John Hammink. Where do you begin in choosing a database? We’ve looked at both NoSQL and relational database management systems to come up with a bird’s eye view of both ecosystems to get you started.
  2. Making Graph Databases Fun Again With Java by Otavio Santana. Graph databases need to be made fun again! Not to worry — the open-source TinkerPop from Apache is here to do just that.
  3. How Are Databases Evolving? by Tom Smith. One way that databases are evolving is through the integration and convergence of technologies on the cloud using microservices.
  4. 10 Easy Steps to a Complete Understanding of SQL by Lukas Eder. Too many programmers think SQL is a bit of a beast. It’s one of the few declarative languages out there, and as such, behaves in an entirely different way from imperative, object-oriented, or even functional languages.
  5. MongoDB vs. MySQL by Mihir Shah. There are many database management systems in the market to choose from. So how about a faceoff between two dominant solutions that are close in popularity?

PS: Are you interested in contributing to DZone? Check out our Bounty Board, where you can apply for specific writing prompts and win prizes!

Databasin’ It Up

Let’s journey outside of DZone and check out some recent news, conferences, and more that should be of interest to database newbies.

Dive Even Deeper Into Database

DZone has Guides and Refcardz on pretty much every tech-related topic, but if you’re specifically interested in databases these will appeal the most to you.

  1. The DZone Guide to Databases: Speed, Scale, and Security. Advances in database technology have traditionally been lethargic. That trend has shifted recently with a need to store larger and more dynamic data. This DZone Guide is focused on how to prepare your database to run faster, scale with ease, and effectively secure your data.
  2. Graph-Powered Search: Neo4j & Elasticsearch. In this Refcard, learn how combining technologies adds another level of quality to search results based on code and examples.

Original Link

Connecting Elasticsearch Directly to your Java EE Application

The trendy word big data comes of the 3 Vs: volume, variety, and velocity. Volume refers to the size of data, variety refers to the diverse types of data, and velocity refers to the speed of data processing. To handle persistent big data, there are NoSQL databases that write and read data faster. But with the diversity in a vast volume, a search engine is required to find information that is without significant computer power and that takes too much time. A search engine is a software system that is designed to search for information; this mechanism makes it more straightforward and clear for users get the information that they want.

This article will cover NoSQL that is both document type and search engine Elasticsearch.

Elasticsearch is a NoSQL document type and a search engine based on Lucene. It provides a distributed, multi-tenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is released as open-source under the terms of the Apache License. Elasticsearch is the most popular enterprise search engine followed by Apache Solr, which is also based on Lucene. It is a near-real-time search platform. What this means is there is a slight latency (normally one second) from the time you index a document until the time it becomes searchable.

Steps in a Search Engine

In Elasticsearch, the progress of a search engine is based on the analyzer, which is a package containing three lower-level building blocks: character filters, tokenizers, and token filters. Through the Elasticstatic documentation, the definitions are:

  • A character filter receives the original text as a stream of characters and can transform the stream by adding, removing, or changing characters. For instance, a character filter could be used to convert Hindu-Arabic numerals into their Arabic-Latin equivalents or to strip HTML elements from the stream.

  • A tokenizer receives a stream of characters, breaks it up into individual tokens (usually individual words), and outputs a stream of tokens. For instance, a whitespace tokenizer breaks the text into tokens whenever it sees any whitespace. It would convert the text “Quick brown fox!” into the terms [Quick, brown, fox!].

  • A token filter receives the token stream and may add, remove, or change tokens. For example, a lowercase token filter converts all tokens to lowercase, a stop token filter removes common words (stop words) like the from the token stream, and a synonym token filter introduces synonyms into the token stream.

How to Install ElasticSearch in Docker

The first step to use ES is to install it in Docker. You can install both manually and through Docker. The easiest way is with Docker following the steps below:

docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"

Elasticsearch and Java EE Working Together

Eclipse JNoSQL is the bridge to work between these platforms (Java EE and the search engine). An important point to remember is that Elasticsearch is also a NoSQL document type, so a developer may model the application as such. To use both the standard document behavior and the Elasticsearch API, a programmer needs to use the the Elasticsearch extension.

<dependency> <groupId>org.jnosql.artemis</groupId> <artifactId>elasticsearch-extension</artifactId> <version>0.0.5</version>

For this demo, we’ll create a contacts agenda for a developer that will have a name, address, and, of course, the language that they know. An address has fields and becomes a subdocument that is a document inside a document.

public class Developer { @Id private Long id; @Column private String name; @Column private List < String > phones; @Column private List < String > languages; @Column private Address address;
} @Embeddable
public class Address { @Column private String street; @Column private String city; @Column private Integer number; }

With the model defined, let’s set the mapping. Mapping is the process of determining how a document and the fields it contains are stored and indexed. For this example, the fields are usually the type keyword and those are only searchable by their exact value. Also, there is the languages field that we defined as text with a custom analyzer. This custom analyzer, the whitespace_analyzer, has one tokenizer, whitespace, and three filters (standard, lowercase, and asciifolding).

{ "settings": { "analysis": { "filter": { }, "analyzer": { "whitespace_analyzer": { "type": "custom", "tokenizer": "whitespace", "filter": [ "standard", "lowercase", "asciifolding" ] } } } }, "mappings": { "developer": { "properties": { "name": { "type": "keyword" }, "languages": { "type": "text", "analyzer": "whitespace_analyzer" }, "phones": { "type": "keyword" }, "address": { "properties": { "street": { "type": "text" }, "city": { "type": "text" }, "number": { "type": "integer" } } } } } }

With the API, the developer can do the basic operations of a document NoSQL database — at least, a CRUD — however, in ES, the behavior of search engine matters and is useful. That why it has an extension.

public class App { public static void main(String[] args) { Random random = new Random(); Long id = random.nextLong(); try (SeContainer container = SeContainerInitializer.newInstance().initialize()) { Address address = Address.builder() .withCity("Salvador") .withStreet("Rua Engenheiro Jose") .withNumber(10).build(); Developer developer = Developer.builder(). withPhones(Arrays.asList("85 85 343435684", "55 11 123448684")) .withName("Poliana Lovelace") .withId(id) .withAddress(address) .build(); DocumentTemplate documentTemplate =; Developer saved = documentTemplate.insert(developer); System.out.println("Developer saved" + saved); DocumentQuery query = select().from("developer") .where("_id").eq(id).build(); Optional < Developer > personOptional = documentTemplate.singleResult(query); System.out.println("Entity found: " + personOptional); } } private App() {}

From the Elasticsearch extension, the user might use the QueryBuilders, a utility class to create search queries in the database.

public class App3 { public static void main(String[] args) throws InterruptedException { try (SeContainer container = SeContainerInitializer.newInstance().initialize()) { Random random = new Random(); long id = random.nextLong(); Address address = Address.builder() .withCity("São Paulo") .withStreet("Av. nove de Julho 1854") .withNumber(10).build(); Developer developer = Developer.builder(). withPhones(Arrays.asList("85 85 343435684", "55 11 123448684")) .withName("Maria Lovelace") .withId(id) .withAddress(address) .withLanguage("Java SE") .withLanguage("Java EE") .build(); ElasticsearchTemplate template =; Developer saved = template.insert(developer); System.out.println("Developer saved" + saved); TimeUnit.SECONDS.sleep(2 L); TermQueryBuilder query = QueryBuilders.termQuery("phones", "85 85 343435684"); List < Developer > people =; System.out.println("Entity found from phone: " + people); people ="languages", "java")); System.out.println("Entity found from languages: " + people); } } private App3() {}


An application that has an intuitive way to find data in an enterprise application is prime, mainly when the software handles a massive and with several data kinds. Elasticsearch can help the Java EE world with both NoSQL documents and a search engine. This post covered how to join the best of these two worlds using Eclipse JNoSQL.

Original Link

Deploy Elasticsearch with Kubernetes on AWS in 10 Steps

Kubernetes, aka "K8s", is an open-source system for automating deployment, scaling and management of containerized applications. In this tutorial, I will show how to set up a Kubernetes cluster and deploy an Elasticsearch cluster on it in AWS. A similar setup should also work for GCE and Azure.

Configuring Kubernetes on AWS

Before we get started, you should have administrative access to the following AWS services: S3, EC2, Route53, IAM, and VPC.

Original Link

Shipping Data to AWS Elasticsearch With Logagent

Elasticsearch is already quite popular and its popularity just keeps growing. Looking at a Google Trends chart for the last 5 years shows this nicely:

There are a number of reasons why Elasticsearch is popular: it is very easy to get going with Elasticsearch, it’s user-friendly and has great APIs, among other things. Its growing popularity is not only beneficial to Elasticsearch itself, but also to the whole community as the ecosystem around it is growing rapidly as well. There are tools developed by Elastic (the company, not software), like Logstash or Kibana and there are tools provided by third-party companies and developers like Logagent, Search Guard (you can find a blog post about it on our blog – Securing Elasticsearch and Kibana with Search Guard), Grafana, and many many more.

In addition to Elasticsearch and the ecosystem built around it, there are also commercial offerings. From hosted ELK as a service for logs that’s a part of Sematext Cloud, to hosted Elasticsearch services like Amazon Elasticsearch Service which lets you run your own managed clusters.

Amazon Elasticsearch Service or Sematext Logsene

Before we talk about shipping data to an AWS Elasticsearch service let’s just do a quick check. Is an AWS Elasticsearch service really what you want? To help with that decision, consider the following questions:

  • What is my use case for Elasticsearch? Do I have use cases other than centralized logging?
  • Do I have enough knowledge and experience to support my own Elasticsearch cluster?
  • Do I want to take the full responsibility for Elasticsearch maintenance and scaling to support a growing volume of data and/or queries?
  • Are there enough people on my team to share the burden or is it all going to fall on my plate?

If some or most of the answers are no, then you may want to stop reading here, check Logsene and save yourself both time and money. If, however, your use cases potentially include, but are not limited to, logging, then keep reading!

Here is a diagram that helps people figure out if they should use AWS Elasticsearch, or run their own Elasticsearch, or use a service like Logsene.

If most of the answers to the questions above were yes then you are likely considering an Amazon Elasticsearch service and going through the above flow diagram should confirm that. We compared the self-hosted Elasticsearch and the Amazon Elasticsearch service some time ago – you can read about in AWS Elasticsearch Service vs. Elasticsearch on EC2 blog post. The gist is that Amazon Elasticsearch service provides:

  • Automatic failed node replacement.
  • Node adding/removal via an API.
  • Rights management via IAM.
  • Daily S3 snapshots.
  • Basic CloudWatch metrics for Elasticsearch.

But the downsides are:

  • Increased costs compared to traditional EC2 instances.
  • Fewer instance types available.
  • Limited cluster-wide changes possible.
  • Unavailability of Elasticsearch logs.
  • Limited debugging possibilities because of API restrictions.

If the pros are more valuable than the cons limit you, and you would like to ship your logs to Amazon Elasticsearch Service let’s see how to actually do that with Logagent, an open source, Node.js based log shipper.

Logagent to Amazon Elasticsearch Service

When using Amazon Elasticsearch Service you gain security as an out of the box feature, but you are also left hanging a bit as the official Elasticsearch client library doesn’t support it. You either have to disable security and allow for communication from certain hosts without authentication or choose not to rely on the official Elasticsearch client libraries. However, if your use case is log/event shipping things are not that bad – you can use Logstash with an additional plugin or the newest version of Logagent, which has lower overhead and minimal impact on the system. See Top 5 Logstash Alternatives for more details.

Configuring Logagent to ship data to the Amazon Elasticsearch Service is really quite simple. For the purpose of the blog post, I will simply send the contents of a file to Elasticsearch. Let’s assume my file is called app.log and is that it lives in the /var/log/myapp/ directory. The input part of Logagent configuration looks as follows:

input: stdin: true files: - /var/log/myapp/app.log

Now the data that is read from the input needs to be sent to an output – in our case our AWS Elasticsearch Service instance available at (no, not available anymore – get your own!). To do that, we will use the output module called output-aws-elasticsearch. The configuration looks as follows:

output: aws-es: module: output-aws-elasticsearch url: index: myapp_logs type: myapp awsConfigFile: ./aws-config.json log: - type: 'stdio' levels: []

In the above output definition we mention the auth and awsConfigFile options. These are important. Logagent uses AWS SDK libraries and supports all authentication methods provided by the AWS API including signed HTTP requests. Logagent assumes you will provide the credentials to access your AWS environment by using the JSON file specified using the awsConfigFile options. The content of the aws-config.json looks as follows:

{ "accessKeyId": <YOUR_ACCESS_KEY_ID>, "secretAccessKey": <YOUR_SECRET_ACCESS_KEY>, "region": "us-east-1"

So we need to provide the AWS access key, AWS secret key, and the region where we have our AWS Elasticsearch Service instances created and we are good to go. You can get all of this via the AWS Console.

And this is really all you need to do. Logagent really makes data shipping easy it’s Apache Licensed and open-sourced on Github, completely pluggable, featuring a number of input, output, and processor plugins, and it’s very easy to add your own. For more information check out Enjoy!

Original Link

Elasticsearch Tutorial: Creating an Elasticsearch Cluster

Unless you are using Elasticsearch for development and testing, creating and maintaining an Elasticsearch cluster will be a task that will occupy quite a lot of your time. Elasticsearch is an extremely powerful search and analysis engine, and part of this power lies in the ability to scale it for better performance and stability.

This tutorial will provide some information on how to set up an Elasticsearch cluster and will add some operational tips and best practices to help you get started. It should be stressed though that each Elasticsearch setup will likely differ from one another depending on multiple factors, including the workload on the servers, the amount of indexed data, hardware specifications, and even the experience of the operators.

What Is an Elasticsearch Cluster?

As the name implies, an Elasticsearch cluster is a group of one or more Elasticsearch nodes instances that are connected together. The power of an Elasticsearch cluster lies in the distribution of tasks, searching and indexing, across all the nodes in the cluster.

The nodes in the Elasticsearch cluster can be assigned different jobs or responsibilities:

  • Data nodes – stores data and executes data-related operations such as search and aggregation.
  • Master nodes – in charge of cluster-wide management and configuration actions such as adding and removing nodes.
  • Client nodes – forwards cluster requests to the master node and data-related requests to data nodes.
  • Ingest nodes – for pre-processing documents before indexing.

By default, each node is automatically assigned a unique identifier, or name, that is used for management purposes and becomes even more important in a multi-node, or clustered, environment.

When installed, a single Elasticsearch node will form a new single-node cluster entitled “elasticsearch,” but, as we shall see later on in this article, it can also be configured to join an existing cluster using the cluster name. Needless to say, these nodes need to be able to identify each other to be able to connect.

Installing an Elasticsearch Cluster

As always, there are multiple ways of setting up an Elasticsearch cluster. You can use a configuration management tool such as Puppet or Ansible to automate the process. In this case, though, we will be showing you how to manually set up a cluster consisting of one master node and two data nodes, all on Ubuntu 16.04 instances on AWS EC2 running in the same VPC. The security group was configured to enable access from anywhere using SSH and TCP 5601 (Kibana).

Installing Java

Elasticsearch is built on Java and requires at least Java 8 (1.8.0_131 or later) to run. Our first step, therefore, is to install Java 8 on all the nodes in the cluster. Please note that the same version should be installed on all Elasticsearch nodes in the cluster.

Repeat the following steps on all the servers designated for your cluster.

First, update your system:

sudo apt-get update

Then, install Java with:

sudo apt-get install default-jre

Checking your Java version now should give you the following output or similar:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (build 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Installing Elasticsearch Nodes

Our next step is to install Elasticsearch. As before, repeat the steps in this section on all your servers.

First, you need to add Elastic’s signing key so that the downloaded package can be verified (skip this step if you’ve already installed packages from Elastic):

wget -qO - | sudo apt-key add -

For Debian, we need to then install the apt-transport-https package:

sudo apt-get install apt-transport-https

The next step is to add the repository definition to your system:

echo "deb stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list

All that’s left to do is to update your repositories and install Elasticsearch:

sudo apt-get update sudo apt-get install elasticsearch

Configuring the Elasticsearch Cluster

Our next step is to set up the cluster so that the nodes can connect and communicate with each other.

For each node, open the Elasticsearch configuration file:

sudo vim /etc/elasticsearch/elasticsearch.yml

This file is quite long and contains multiple settings for different sections. Browse through the file, and enter the following configurations (replace the IPs with your node IPs):

#give your cluster a name. my-cluster #give your nodes a name (change node number from node to node). "es-node-1" #define node 1 as master-eligible:
node.master: true #define nodes 2 and 3 as data nodes: true #enter the private IP and port of your node:
http.port: 9200 #detail the private IPs of your nodes: ["", "",""]

Save and exit.

Running Your Elasticsearch Cluster

You are now ready to start your Elasticsearch nodes and verify they are communicating with each other as a cluster.

For each instance, run the following command:

sudo service elasticsearch start

If everything was configured correctly, your Elasticsearch cluster should be up and running. To verify everything is working as expected, query Elasticsearch from any of the cluster nodes:

curl -XGET 'http://localhost:9200/_cluster/state?pretty'

The response should detail the cluster and its nodes:

{ "cluster_name" : "my-cluster", "compressed_size_in_bytes" : 351, "version" : 4, "state_uuid" : "3LSnpinFQbCDHnsFv-Z8nw", "master_node" : "IwEK2o1-Ss6mtx50MripkA", "blocks" : { }, "nodes" : { "IwEK2o1-Ss6mtx50MripkA" : { "name" : "es-node-2", "ephemeral_id" : "x9kUrr0yRh--3G0ckESsEA", "transport_address" : "", "attributes" : { } }, "txM57a42Q0Ggayo4g7-pSg" : { "name" : "es-node-1", "ephemeral_id" : "Q370o4FLQ4yKPX4_rOIlYQ", "transport_address" : "", "attributes" : { } }, "6YNZvQW6QYO-DX31uIvaBg" : { "name" : "es-node-3", "ephemeral_id" : "mH034-P0Sku6Vr1DXBOQ5A", "transport_address" : "", "attributes" : { } } }, …

Elasticsearch Cluster Configurations for Production

We already defined the different roles for the nodes in our cluster, but there are some additional recommended settings for a cluster running in a production environment.

Avoiding “Split Brain”

A “split-brain” situation is when communication between nodes in the cluster fails due to either a network failure or an internal failure with one of the nodes. In this kind of scenario, more than one node might believe it is the master node, leading to a state of data inconsistency.

For avoiding this situation, we can make changes to the discovery.zen.minimum_master_nodes directive in the Elasticsearch configuration file which determines how many nodes need to be in communication (quorum) to elect a master.

A best practice to determine this number is to use the following formula to decide this number: N/2 + 1. N is the number of master eligible nodes in the cluster. You then round down the result to the nearest integer.

In the case of a cluster with three nodes, then:

discovery.zen.minimum_master_nodes: 2

Adjusting JVM Heap Size

To ensure Elasticsearch has enough operational leeway, the default JVM heap size (min/max 1 GB) should be adjusted.

As a rule of the thumb, the maximum heap size should be set up to 50% of your RAM, but no more than 32GB (due to Java pointer inefficiency in larger heaps). Elastic also recommends that the value for maximum and minimum heap size be identical.

These value can be configured using the Xmx and Xms settings in the jvm.options file.


sudo vim /etc/elasticsearch/jvm.options -Xms2g

Disabling Swapping

Swapping out unused memory is a known behavior but, in the context of Elasticsearch, can result in disconnects, bad performance, and, in general, an unstable cluster.

To avoid swapping, you can either disable all swapping (recommended if Elasticsearch is the only service running on the server), or you can use it to lock the Elasticsearch process to RAM.

To do this, open the Elasticsearch configuration file on all nodes in the cluster:

sudo vim /etc/elasticsearch/elasticsearch.yml

Uncomment the following line:

bootstrap.mlockall: true

Next, open the /etc/default/elasticsearch file:

sudo vim /etc/default/elasticsearch

Make the following configurations:


Restart Elasticsearch when you’re done.

Adjusting Virtual Memory

To avoid running out of virtual memory, increase the amount of limits on mmap counts:

sudo vim /etc/sysctl.conf

Update the relevant settings accordingly:


On DEB/RPM, this setting is configured automatically.

Increasing Open File Descriptor Limit

Another important configuration is the limit of open file descriptors. Since Elasticsearch makes use of a large number of file descriptors, you must ensure the defined limit is enough, otherwise, you might end up losing data.

The common recommendation for this setting is 65,536 and higher. On DEB/RPM the default settings are already configured to suit this requirement but you can of course fine tune it.

sudo vim /etc/security/limits.conf

Set the limit:

 - nofile 65536

Elasticsearch Cluster APIs

Elasticsearch supports a large number of cluster-specific API operations that allow you to manage and monitor your Elasticsearch cluster. Most of the APIs allow you to define which Elasticsearch node to call using either the internal node ID, its name, or its address.

Below is a list of a few of the more basic API operations you can use. For advanced usage of cluster APIs, read this blog post.

Cluster Health

This API can be used to see general information on the cluster and gauge its health:

curl -XGET 'localhost:9200/_cluster/health?pretty'


{ "cluster_name" : "my-cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 3, "number_of_data_nodes" : 3, "active_primary_shards" : 0, "active_shards" : 0, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0

Cluster State

This API can be sued to see a detailed status report on your entire cluster. You can filter results by specifying parameters in the call URL.

curl -XGET 'localhost:9200/_cluster/state?pretty'


{ "cluster_name" : "my-cluster", "compressed_size_in_bytes" : 347, "version" : 4, "state_uuid" : "uMi5OBtAS8SSRJ9hw1-gUg", "master_node" : "sqT_y5ENQ9SdjHiE0oco_g", "blocks" : { }, "nodes" : { "sqT_y5ENQ9SdjHiE0oco_g" : { "name" : "node-1", "ephemeral_id" : "-HDzovR0S0e-Nn8XJ-GWPA", "transport_address" : "", "attributes" : { } }, "mO0d0hYiS1uB--NoWuWyHg" : { "name" : "node-3", "ephemeral_id" : "LXjx86Q5TrmefDoq06MY1A", "transport_address" : "", "attributes" : { } }, "it1V-5bGT9yQh19d8aAO0g" : { "name" : "node-2", "ephemeral_id" : "lCJja_QtTYauP3xEWg5NBQ", "transport_address" : "", "attributes" : { } } }, "metadata" : { "cluster_uuid" : "8AqSmmKdQgmRVPsVxyxKrw", "templates" : { }, "indices" : { }, "index-graveyard" : { "tombstones" : [ ] } }, "routing_table" : { "indices" : { } }, "routing_nodes" : { "unassigned" : [ ], "nodes" : { "it1V-5bGT9yQh19d8aAO0g" : [ ], "sqT_y5ENQ9SdjHiE0oco_g" : [ ], "mO0d0hYiS1uB--NoWuWyHg" : [ ] } }, "snapshots" : { "snapshots" : [ ] }, "restore" : { "snapshots" : [ ] }, "snapshot_deletions" : { "snapshot_deletions" : [ ] }

Cluster Stats

Extremely useful for monitoring performance metrics on your entire cluster:

curl -XGET 'localhost:9200/_cluster/stats?human&pretty'


{ "_nodes" : { "total" : 3, "successful" : 3, "failed" : 0 }, "cluster_name" : "my-cluster", "timestamp" : 1517224098451, "status" : "green", "indices" : { "count" : 0, "shards" : { }, "docs" : { "count" : 0, "deleted" : 0 }, "store" : { "size" : "0b", "size_in_bytes" : 0 }, "fielddata" : { "memory_size" : "0b", "memory_size_in_bytes" : 0, "evictions" : 0 }, "query_cache" : { "memory_size" : "0b", "memory_size_in_bytes" : 0, "total_count" : 0, "hit_count" : 0, "miss_count" : 0, "cache_size" : 0, "cache_count" : 0, "evictions" : 0 }, "completion" : { "size" : "0b", "size_in_bytes" : 0 }, "segments" : { "count" : 0, "memory" : "0b", "memory_in_bytes" : 0, "terms_memory" : "0b", "terms_memory_in_bytes" : 0, "stored_fields_memory" : "0b", "stored_fields_memory_in_bytes" : 0, "term_vectors_memory" : "0b", "term_vectors_memory_in_bytes" : 0, "norms_memory" : "0b", "norms_memory_in_bytes" : 0, "points_memory" : "0b", "points_memory_in_bytes" : 0, "doc_values_memory" : "0b", "doc_values_memory_in_bytes" : 0, "index_writer_memory" : "0b", "index_writer_memory_in_bytes" : 0, "version_map_memory" : "0b", "version_map_memory_in_bytes" : 0, "fixed_bit_set" : "0b", "fixed_bit_set_memory_in_bytes" : 0, "max_unsafe_auto_id_timestamp" : -9223372036854775808, "file_sizes" : { } } }, "nodes" : { "count" : { "total" : 3, "data" : 3, "coordinating_only" : 0, "master" : 3, "ingest" : 3 }, "versions" : [ "6.1.2" ], "os" : { "available_processors" : 3, "allocated_processors" : 3, "names" : [ { "name" : "Linux", "count" : 3 } ], "mem" : { "total" : "10.4gb", "total_in_bytes" : 11247157248, "free" : "4.5gb", "free_in_bytes" : 4915200000, "used" : "5.8gb", "used_in_bytes" : 6331957248, "free_percent" : 44, "used_percent" : 56 } }, "process" : { "cpu" : { "percent" : 10 }, "open_file_descriptors" : { "min" : 177, "max" : 178, "avg" : 177 } }, "jvm" : { "max_uptime" : "6m", "max_uptime_in_millis" : 361766, "versions" : [ { "version" : "1.8.0_151", "vm_name" : "OpenJDK 64-Bit Server VM", "vm_version" : "25.151-b12", "vm_vendor" : "Oracle Corporation", "count" : 3 } ], "mem" : { "heap_used" : "252.1mb", "heap_used_in_bytes" : 264450008, "heap_max" : "2.9gb", "heap_max_in_bytes" : 3195076608 }, "threads" : 63 }, "fs" : { "total" : "23.2gb", "total_in_bytes" : 24962703360, "free" : "19.4gb", "free_in_bytes" : 20908818432, "available" : "18.2gb", "available_in_bytes" : 19570003968 }, "plugins" : [ ], "network_types" : { "transport_types" : { "netty4" : 3 }, "http_types" : { "netty4" : 3 } } }

Nodes Stats

If you want to inspect metrics for specific nodes in the cluster, use this API. You can see information for all nodes, a specific node, or ask to see only index or OS/process specific stats.

All nodes:

curl -XGET 'localhost:9200/_nodes/stats?pretty'

A specific node:

curl -XGET 'localhost:9200/_nodes/node-1/stats?pretty'

Index-only stats:

curl -XGET 'localhost:9200/_nodes/stats/indices?pretty'

This is just the tip of the iceberg and there are plenty more APIs available. Refer to the official Cluster API docs for reference.

What’s Next?

“When all else fails, read the fuc%^&* manual” goes the famous saying. Thing is, the manual in question, and the technology it documents, are not straightforward, to say the least.

This tutorial made a brave attempt to provide users with the basics of setting up and configuring their first Elasticsearch cluster, knowing full well that it is virtually impossible to provide instructions that suit every environment and use case.

Together with this tutorial, I strongly recommend doing additional research. Other than Elastic’s official documentation, here are some additional informative resources:

Good luck!

Original Link

Deploying Elasticsearch 6.x on Azure With Terraform

Terraform is my go-to tool for repeatable and easy infrastructure deployments. I’ve previously shared how I deploy Elasticsearch on AWS with Terraform and Packer, and since posting that, I used it to deploy many clusters and it also got picked up by quite a few others.

Our offerings at BigData Boutique are cloud-agnostic and as such, we also help projects deployed on clouds other clouds. Today, we will be looking at deploying a full Elasticsearch cluster using best practices end-to-end on Microsoft Azure.

You can find all relevant code and documentation here. This entire Terraforming project supports deploying both Elasticsearch 5.x and 6.x clusters.

Feel free to share your experience, report issues, and request features here.

Creating Immutable Images With Packer

To enable quickly launching machines on the cloud without waiting for lengthy installs on provisioning, and also to avoid snowflake servers, I opted for generating images of the servers we deploy and then just provisioning machines with those images loaded into them. This is a general practice I use and here, it is really easy to see how it makes a difference.

Packer is a tool for creating machine and container images for multiple platforms from a single source configuration.”

In other words, you can easily define the steps to execute on a base image and then run it everywhere to create images that you can later deploy.

In my solution, I created two images. One is an image for an Elasticsearch node that is installed on the latest Ubuntu; the second is an image with Kibana, Grafana, and Cerebro installed that is based on the first image and will be later used as an external and internal gateway to the cluster.

More details and instructions for running this can be found in the README. You need to create those images in order to proceed to the next step.

Deploying an Elasticsearch Cluster With Terraform

Terraform is great at describing complex infrastructure easily and in a repeatable way. I find Terraform so much easier to use for deploying and amending infrastructure — especially on Azure, which, for many people, tends to be more UI-oriented.

Once you have created the machine images with Packer, all is left for you to do is editing some configurations (i.e. machine sizes, number of nodes, Azure location, SSH keys to use) and you are set to go.

Running terraform plan and then terraform apply will create the cluster for you using scale-sets and load balancing for the client nodes and the necessary network interfaces. Everything will be set up using best practices, although your mileage may vary and you might want to fork my work and adapt it to your use case.

The recommended configuration is to have exactly three master nodes, at least two data nodes, and at least one client node (and it’s easy to add more to ensure 100% uptime). This is supported out-of-the-box. We also support a single-node mode, mostly for experimentation, but it also might be usable for very small deployments.

Elastic’s X-Pack is deployed on the cluster out of the box with monitoring enabled but security disabled — you should enable and set up X-Pack Security for any production deployment.

Full details and instructions are here.

Client Nodes With Kibana, Grafana, and Cerebro

Once deployed, the cluster is fully configured and is accessible via the deployed client nodes. The client nodes also expose Kibana instances and a Cerebro UI on top of the cluster, so everything is fully visible and ready for use. There is also Grafana installed for those who prefer using Grafana dashboards on top of Elasticsearch.

Those client nodes are also the ones your apps need to talk to (internally, of course). They are password-protected (the password is automatically generated and can be retrieved using terraform output), and you might want to remove that completely and rely on your vnet and private IPs, removing public IP access completely. I discussed security concerns in this article before.

Note: The first time Kibana is initialized, it takes about ten minutes to become available. It does some magic compressions and stuff.

Elastic Discovery on Azure

Unfortunately, the story of cluster discovery on Azure is quite bad. There is an Azure “Classic” discovery plugin that has been deprecated since circa 5.0 and Elastic are yet to release a properly working discovery plugin. There is a PR for an Azure RM discovery plugin, which is open for over a year now without any real progress if you want to track it.

A discovery plugin on a public cloud is important because it takes a lot of complexity off your hands and manages the initial cluster nodes discovery using the available cloud APIs.

Having none available, I defaulted to using vnet and naming conventions. Another viable option is using file-based discovery, which is a file describing your cluster you can upload to the images and use as a seed.


The Azure repository plugin is installed on the cluster and ready to be used for index snapshots and (should you ever need) a restore. Official documentation is available here.

Original Link

Data Analytics Made Easier With Elasticsearch

Companies all across the globe have cashed in on collecting as much data as possible in order to have better insights. The mindset is quite straightforward when it comes to leveraging heaps of data to drive business through better decision making.

Collecting data is good and collecting big data is better, but the process of assessing and analyzing big data not so easy. It requires knowledge of enterprise search engines for making content from different sources like enterprise databases, social media, sensor data, etc. searchable to a defined audience. Elasticsearch, Apache Solr, and Sphinx are some of the free and open-source enterprise search software.

Before we dive in, let’s go through the basics of Elasticsearch. It is the main product of a company called Elastic. It is a very useful tool for indexing of documents coupled with full text-based search. The domain-specific query language (JSON-based) is simplistic yet highly formidable, which makes it the default standard when it comes to search integration. Elasticsearch is mainly used for web search, log analysis, and big data analytics. Often compared with Apache Solr, both depend on Apache Lucene for low-level indexing and analysis. Elasticsearch is more popular because it is easy to install, scales out to hundreds of nodes with no additional software, and is easy to work with due to its built-in REST API.

Advantages of Implementing Elasticsearch

1. Developer-Friendly API

Elasticsearch is API-driven. Almost any action can be performed using a simple RESTful API using JSON over HTTP. Client libraries are available for many programming languages. It has clean and easily navigatable documentation, increasing the quality and user experience of independently created applications on your platform. It can be integrated with Hadoop for fast query results. Klout, a website that measures social media influence, uses this technique and has a scale from 100 million to 400 million users while reducing the database update time from one day down to four hours and delivering query results to business analysts in seconds rather than minutes.

2. Real-Time Analytics

Real-time analytics provide updated results of customer events such as page views, website navigation, shopping cart use, or any other kind of online or digital activity. This data is extremely important for businesses conducting dynamic analysis and reporting in order to quickly respond to trends in user behavior. Using Elasticsearch data is immediately available for search and analytics. Elasticsearch combines the speed of search instances with the power of analytics for better decision-making. It gives insights that make your business streamlined and improves your products through interactive search and other analyzing features.

3. Ease of Data Indexing

Data indexing is a way of sorting a number of records on multiple fields. Elasticsearch is schema-free and document-oriented. It stores complex real-world entities in Elasticsearch as structured JSON documents. Simply index a JSON document and it will automatically detect the data structure and types, create an index, and make your data searchable. You also have full control to customize how your data is indexed. It simplifies the analytics process by improving the speed of data retrieval process on a database table.

4. Full-Text Search

With full-text search, a search engine examines all of the words in every stored document as it tries to match search criteria. Elasticsearch builds distributed capabilities on top of Apache Lucene to provide the most powerful full-text search capabilities available in any open-source product. Its powerful, developer-friendly query API supports multilingual search, geolocation, contextual did-you-mean suggestions, autocomplete, and result-snippets.

5. Resilient Clusters

Elasticsearch clusters are resilient: they will detect new or failed nodes. It will also reorganize and rebalance data automatically to ensure that your data is safe and accessible. A cluster may contain multiple indices that can be queried independently or as a group. Index aliases allow filtered views of an index and may be updated transparently to your application.

Some of the core benefits highlighting how Elasticsearch can be useful for business include:

  • Managing huge amounts of data in a quick and seamless manner compared to traditional SQL database management systems.
  • Quick access to documents as they are stored in close proximity to corresponding metadata in the index, thereby reducing the number of data reads and faster search result response.
  • Scalability: Enables to scale up to thousands of servers and accommodates petabytes of data.

Original Link

Java High-Level REST Client: Elasticsearch

Elasticsearch is an open-source, highly scalable full-text search and analytics engine. Using it, you can easily store, search, and analyze a large amount of data in real time. The Java REST client is the official client for Elasticsearch and comes in two flavors:

  1. Java low-level REST client: It allows communicating with an Elasticsearch cluster through HTTP and leaves requests marshaling and responses un-marshaling to users.
  2. Java high-level REST client: It is based on a low-level client and exposes API-specific methods, taking care of requests marshaling and responses un-marshaling.

Our focus here will be to learn about the high-level REST client. I hope you are clear with the basics of Elasticsearch; if not, you can go through its documentation here.


The Java high-level REST client works on top of a Java low-level REST client. It is forward-compatible. It allows one to use API-specific methods that accept request objects as an argument and return response objects. Serialization and deserialization of request and response objects is handled by the client itself. Each API can be called either synchronously or asynchronously.

Let’s discuss how we can use a high-level REST client in our Scala-SBT application.

Following is the dependency you need to add to build.sbt for using the client:

"org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "6.1.2"

Since the high-level REST client depends on the Elasticsearch core, don’t forget to add an Elasticsearch core dependency.

"org.elasticsearch" % "elasticsearch" % "6.1.2"


The REST high-level client instance can be built as follows:

val client = new RestHighLevelClient( RestClient.builder(new HttpHost(HOST, PORT, "http")))

Here, you can replace HOST with the IP address on which Elasticsearch is running. 9200 is the port to send REST requests to for that node.

The Java high-level REST client supports various APIs — Index, Update, Search, Get, Delete, and Bulk are some of those APIs, and there are many more.

CRUD and Search Operations

With the help of a REST client, we can perform CRUD (Create, Read, Update, and Delete) and search operations against our indexes. Let’s just have a quick review of these features.

Index a Document in Elasticsearch

To insert a document, first, we need to create an IndexRequest — which requires index, type, and document Id as arguments. After that, the document source should be provided with the request in JSON and other supported formats. Here’s an example:

val request = new IndexRequest(index_name, type_name, id)
request.source(jsonString, XContentType.JSON)

(jsonString refers to the data you want to insert in Elasticsearch.)

Then, you can execute the request with the help of the client you created before.


Update an Existing Document

To update a document, you need to prepare an UpdateRequest passing index, type, and ID as arguments and then use a script or a partial document for updating. Then, execute the update request through the client.

val updateRequest = new UpdateRequest(index_name, type_name, id) val builder = XContentFactory.jsonBuilder
builder.field(fieldName, value)
builder.endObject updateRequest.doc(builder)

Delete Operation

Deleting a document just requires two lines of code. Create a DeleteRequest and then execute it via the REST client.

val deleteRequest = new DeleteRequest(index_name, type_name, id)

Deleting the index is also a simple task. Following is the example for that:

val request = new DeleteIndexRequest(index_name)

Search Documents

SearchRequest is used for any operation that has to do with searching documents, aggregations, and suggestions, and also offers ways of requesting highlighting on the resulting documents.
First, create a SearchRequest passing the index name as the argument.

val searchRequest = new SearchRequest(index_name)

After that, SearchSourceBuilder needs to be created. Add to it the query you want to execute.

val searchSourceBuilder = new SearchSourceBuilder

Lastly, execute the SearchRequest through the REST client.

There are several other operations you can execute via a high-level REST client. Also, You can use Kibana to search, view, and interact with data stored in Elasticsearch indices. To understand Kibana, you can go through the documentation.

The complete demo code is available here. You can check the file for instructions to run the application.


Happy blogging!

Original Link

CLI for Indexing Data From MongoDB to Elasticsearch

ElasticSearch is fantastic for indexing and filtering data. But hey, you have your data on a Mongo database in production. How do you copy all data from Mongo to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

I am going to answer these questions in this blog post. To start off, yes it is indeed possible. We have made an awesome CLI tool called ABC that allows you to do this with a single command.

abc import --src_type=mongodb --src_uri=<uri> <elasticsearch_uri>

That’s it. Seriously, this is all you need to sync a Mongo database to an ElasticSearch index. Here’s a video showing the process.

The Steps

The first step is to install ABC if you have not done so already. Go to the GitHub releases page for ABC and download the most recent version. It’s a single no-dependancy binary so put it anywhere you like. We recommended putting it inside a PATH directory so that it can be accessed from anywhere in the terminal.

Ensure that ABC is working by running the following command.

abc version

Image title

Now, let’s take a Mongo database and sync it to an ElasticSearch index hosted on We will use the free GUI MongoDB visualizer called Compass in this tutorial. Go ahead and install it.

Once it has been installed, we will first start the Mongo daemon. This will start the MongoDB server. Go ahead and run the following command in a terminal.

mongod --smallfiles --oplogSize 50 --replSet test

Image title

Then, we will login into the Mongo shell and create a database. Let’s call it admin. Open a new shell and run the following command.

$ mongo
> cfg = {_id: "test", members: [{_id:0, host: "localhost:27017"}]}
> use admin
> rs.initiate(cfg)

Image title

We should now have the Users database ready. Now it’s time to enter the data. We will log into the database using Compass. Enter the following settings on the Connection page and click the Connect button.

Once connected, we create a new collection called “users” and add some data to it.

The final Users collection looks as follows.

The Mongo test source is now complete. Its URL is:


Next, we are going to create the sink ElasticSearch index. We go to and create a new app called abcmongotest. The complete URL to this index looks like the following.

Now, we have both the source and the sink ready. It’s time for some ABC magic. Using this source and sink, we can build the import command. It will be as follows

abc import --src_type=mongodb --src_uri="mongodb://localhost:27017/admin" ""

Once you run this command, you should see that the command will finish in some time with no errors. Now, if you visit the appbaseio dashboard, you can see that the data has been transferred to the target ElasticSearch index.

Voila. Everything works. The data has been transferred to ElasticSearch and that too without doing anything at all. Next, we will see how to make ABC listen to the changes in the Mongo database.

Indexing Real-Time Data Changes From Mongo

If you are using Mongo as your production database system, there are good chances that your data is constantly changing. How to sync the Elasticsearch index with all the changes?

ABC has a nifty tail mode that allows synchronising the Mongo database in realtime to an Elasticsearch index. It uses the replication slot feature of Postgres to be able to do this.

It can be enabled by passing a --tail switch.

abc import--tail --src_type=mongodb --src_uri="mongodb://localhost:27017/admin" ""

Run the above command now and you will see that the command still keeps running even after indexing. This is because it is listening for more changes. Add a new document from the Compass GUI.

Image title

You might see the “1 item indexed” message in the import log now.

2 item(s) indexed
1 item(s) indexed

This means it worked. You should see a new entry in the app dashboard when you hit reload.

Try making some more changes and all of them will be reflected on the appbaseio-based Elasticsearch cluster. 

Transforming Data Before Indexing Into Elasticsearch

There are times when you don’t need the data to go as it is from source to the sink. You might like to change the target type name (i.e. users to accounts) or you might like to remove certain fields (i.e. age) or create new fields. For all this, we have the transforms feature in ABC. It can be used by the transform_file parameter.

abc import --transform_file="transform_file.js" --src_type=mongodb --src_uri="mongodb://localhost:27017/admin" ""

The transform_file parameter takes the path to the transform file. That file, in turn, contains the JavaScript transforms that should be applied to the pipeline. Let’s take the contents of transform_file.js as follows.

t.Source("source", source, "/.*/") .Transform(omit({"fields":["bio"]})) .Save("sink", sink, "/.*/")

In the above transform, you can see that we are going to omit the age field from the data transfer. Now, when we run the new import command, we should have the following result.

As you can see, the age field was omitted when data reached the sink. More documentation on the transform file can be found on GitHub. It supports lots of inbuilt functions like omit and even supports running custom JavaScript code as a transform. It’s a good idea to explore its documentation.

Further Reading

ABC’s README is a good place to start if you want to learn more about the application. You can also have a look at the MongoDB Adaptor Docs. Furthermore, you may star the repo on GitHub and watch it to stay tuned for updates.

Original Link

CLI for Indexing Data From Postgres to Elasticsearch

ElasticSearch is fantastic for indexing and filtering data. But hey, you have your data on a Postgres DB in production. How do you copy all data from Postgres to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

I am going to answer these questions in this post. To start off, yes, it is indeed possible. We at have made an awesome CLI tool called ABC, which will allow you to do this with a single command.

abc import --src_type=postgres --src_uri=<uri> <elasticsearch_uri>

That’s it. Seriously, this is all you need to sync a Postgres database to an ElasticSearch index. Here’s a video showing the process.

The Steps

The first step is to install ABC if you have not done so already. So, go to the GitHub releases page for ABC and download the most recent version. It’s a single no-dependancy binary so put it anywhere you like. We recommended putting it inside a PATH directory so that it can be access from anywhere in the terminal.

Ensure that ABC is working by running the following command.

abc version

Now, let’s take a Postgres database and we are going to sync it to ElasticSearch index hosted on

First, we are going to create a database called ‘users’.

CREATE USER test_user with password 'test_pass';
CREATE DATABASE users with OWNER test_user;
\c users;

Next, we are going to create a table called ‘users’ inside the database ‘users’.

CREATE TABLE users ( email varchar(500) NOT NULL, name varchar(100) NOT NULL, bio text, PRIMARY KEY(email)

After that, we will add some sample data to it.

insert into users values ('', 'foo', 'bar');
insert into users values ('', 'zak', 'ceo');

The table looks like as follows now:

The Postgres test source is now complete. Its URL is:


Next, we are going to create the sink ElasticSearch index. We go to and create a new app called abcpostgrestest. The complete URL to this index looks like the following.

So now, we have both the source and the sink ready. It’s time for some ABC magic. Using this source and sink, we can build the import command. It will be as follows:

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@" ""

Once you run this command, you should see that the command will finish in some time with no errors. Now if you visit appbaseio dashboard, you can see that the data has been transferred to the target ElasticSearch index.

Voila. Everything works. The data has been transferred to ElasticSearch and that too without doing anything at all. Next, we will see how to make ABC listen to the changes in the Postgres database.

Indexing Real-Time Data Changes From Postgres

If you are using Postgres as your production database system, there are good chances that your data is constantly changing. How to sync the Elasticsearch index with all the changes?

ABC has a nifty tail mode that allows synchronizing the Postgres database in real-time to an Elasticsearch index. It uses the replication slot feature of Postgres to be able to do this.

It can be enabled by passing a --tail switch.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@" ""

Now, run the import command again with the tail option. You might see an error with this text:

must be superuser or replication role to use replication slots

Don’t panic. This happens because you don’t have replication setup on your Postgres database which is required for the tailing feature. So, you will now have to make the database user as superuser. (Read ALTER ROLE.)


After running the above command, you should be inside the psql shell. Now, run the following command (where ‘test_user’ is your database username).


Once that is done, you need to create some replication slots. Open postgresql.conf in the text editor and change the wal_level and max_replication_slots as follows. You will have to restart your Postgres server after this. This step enables the feature of replication slots.


After this, you will actually create a replication slot. Go back in the psql shell, connect to the source database, and run the following command.

\c users;
select * from pg_create_logical_replication_slot('users_replication_slot', 'test_decoding');
SELECT * FROM pg_replication_slots;

You should see database users in the replication slot row now. This means that the replication slot is properly setup.

Now let’s try the tail again.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@" --replication_slot="users_replication_slot" ""

Run the import again. It should work this time. 

Also, notice that the import process won’t exit when finished now. It will keep on running and will listen to the changes. Let’s add a new entry to the database and see if it reflects on the appbaseio dashboard.

\c users;
insert into users values ('', 'tony stark', 'iron man');

And yes, it works. You should see a new entry in the app dashboard when you hit reload.

Try making some more changes and all of them will be reflected on the appbaseio based Elasticsearch cluster. ��

Transforming Data Before Indexing Into Elasticsearch

There are times when you don’t need the data to go as it is from source to the sink. You might like to change the target type name (i.e. users to accounts) or you might like to remove certain fields (i.e. bio) or create new fields. For all this, we have the transforms feature in ABC. It can be used by the transform_file parameter.

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@" --transform_file="transform_file.js" ""

The transform_file parameter takes the path to the transform file. That file in turn contains the JavaScript transforms that should be applied to the pipeline. Let’s take the contents of transform_file.js as follows.

t.Source("source", source, "/.*/") .Transform(omit({"fields":["bio"]})) .Save("sink", sink, "/.*/")

In the above transform, you can see that we are going to omit the bio field from the data transfer. Now when we run the new import command, we should have the following result.

As you can see, the bio field was omitted when data reached the sink. More documentation on transform file can be found on GitHub. It supports lots of inbuilt functions like omit and even supports running custom JavaScript code as a transform. It’s a good idea to explore its documentation.

Further Reading

ABC’s README is a good place to start if you want to learn more about the application. You can also have a look at the Postgres adaptor docs. Furthermore, you may star the repo on GitHub and watch it to stay tuned for updates.

Original Link

Spring Boot 2: Fluxes, From Elasticsearch to Controller

This is the final piece of the puzzle in our series on exposing reactive APIs via RESTful interfaces. Previously, we were seeding our Elasticsearch with some sample fake data. Now it’s about time to expose indexing functionality through some API. Let’s start with some simple adapter to our indexing engine:

import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink; @Component
class ElasticAdapter { private final RestHighLevelClient client; private final ObjectMapper objectMapper; Mono<IndexResponse> index(Person doc) { return indexDoc(doc); } private void doIndex(Person doc, ActionListener<IndexResponse> listener) throws JsonProcessingException { return Mono.create(sink -> { try { doIndex(doc, listenerToSink(sink)); } catch (JsonProcessingException e) { sink.error(e); } }); } private void doIndex(Person doc, ActionListener<IndexResponse> listener) throws JsonProcessingException { final IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername()); final String json = objectMapper.writeValueAsString(doc); indexRequest.source(json, XContentType.JSON); client.indexAsync(indexRequest, listener); } private <T> ActionListener<T> listenerToSink(MonoSink<T> sink) { return new ActionListener<T>() { @Override public void onResponse(T response) { sink.success(response); } @Override public void onFailure(Exception e) { sink.error(e); } }; } }

The index() method takes a strongly typed Person object and sends it over to Elasticsearch. First, the doIndex() method makes the actual call to Elasticsearch, marshalling Person to JSON. Having Elastic’s result of the type ActionListener<IndexResponse>, we convert it to a Mono<IndexResponse>. This is done via the listenerToSink() helper method. The sequence of compose() methods are an elegant way to apply a series of metrics:

return indexDoc(doc) .compose(this::countSuccFail) .compose(this::countConcurrent) .compose(this::measureTime) .doOnError(e -> log.error("Unable to index {}", doc, e));

These methods are defined as follows:

private final Timer indexTimer = Metrics.timer("es.timer");
private final LongAdder concurrent = Metrics.gauge("es.concurrent", new LongAdder());
private final Counter successes = Metrics.counter("es.index", "result", "success");
private final Counter failures = Metrics.counter("es.index", "result", "failure"); private Mono<IndexResponse> countSuccFail(Mono<IndexResponse> mono) { return mono .doOnError(e -> failures.increment()) .doOnSuccess(response -> successes.increment());
} private Mono<IndexResponse> countConcurrent(Mono<IndexResponse> mono) { return mono .doOnSubscribe(s -> concurrent.increment()) .doOnTerminate(concurrent::decrement);
} private Mono<IndexResponse> measureTime(Mono<IndexResponse> mono) { return Mono .fromCallable(System::currentTimeMillis) .flatMap(time -> mono.doOnSuccess(response -> indexTimer.record(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS)) );

We could technically apply these metrics without the compose() operator like this:

measureTime( countConcurrent( countSuccFail( indexDoc(doc) ) )

But having a flat sequence of Mono<T> -> Mono<T> transformers seems much easier to read. Anyway, this was the write side, so let’s implement the read side.

Mono<Person> findByUserName(String userName) { return Mono .<GetResponse>create(sink -> client.getAsync(new GetRequest("people", "person", userName), listenerToSink(sink)) ) .filter(GetResponse::isExists) .map(GetResponse::getSource) .map(map -> objectMapper.convertValue(map, Person.class));

The procedure is pretty much the same:

  • Make an Elasticsearch request
  • Adapt it to Mono<GetResponse>
  • Verify the result and unmarshall it from Map to Person object

Interestingly, Jackson’s ObjectMapper can also convert from Maps, not only from JSON strings. Having this layer, we can use it directly in our brand new controller:

import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.index.IndexResponse;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono; import javax.validation.Valid;
import java.util.Map; @RequiredArgsConstructor
class PersonController { private static final Mono<ResponseEntity<Person>> NOT_FOUND = Mono.just(ResponseEntity.notFound().build()); private final ElasticAdapter elasticAdapter; @GetMapping("/{userName}") Mono<ResponseEntity<Person>> get(@PathVariable("userName") String userName) { return elasticAdapter .findByUserName(userName) .map(ResponseEntity::ok) .switchIfEmpty(NOT_FOUND); } @PutMapping Mono<ResponseEntity<Map<String, Object>>> put(@Valid @RequestBody Person person) { return elasticAdapter .index(person) .map(this::toMap) .map(m -> ResponseEntity.status(HttpStatus.CREATED).body(m)); } private ImmutableMap<String, Object> toMap(IndexResponse response) { return ImmutableMap .<String, Object>builder() .put("id", response.getId()) .put("index", response.getIndex()) .put("type", response.getType()) .put("version", response.getVersion()) .put("result", response.getResult().getLowercase()) .put("seqNo", response.getSeqNo()) .put("primaryTerm", response.getPrimaryTerm()) .build(); } }

The get() method tries to find a document in Elasticsearch by “userName”. Newcomers to RxJava or Reactor are very eager to call subscribe() or block*(). Interestingly, none of these are needed in Spring WebFlux. You create a bunch of Monos or Fluxes, pass them through a series of transformations and return from your controller. It just works.

The put() method is equally simple. For debugging purposes, I convert IndexResponse to JSON in the toMap() method, but this isn’t necessary. As you can see, building reactive applications in Spring WebFlux is quite simple. We no longer need any adapting layers or blocking code. Everything is fully asynchronous and event-driven. Moreover, in this setup (see the source code), there are no servlets or Jetty/Tomcat on the CLASSPATH!

Spring has built-in reactive support for some databases like MongoDB. In these blog posts, I gave you an overview how to integrate Reactor with Spring and other databases that provide non-blocking APIs. You can easily adjust code samples to use it with other sources and data stores.

This is part of a longer series:

Original Link

The Removal of Mapping Types in Elasticsearch 6: The Aftermath

As you may have already heard, the recent release of Elasticsearch 6 signaled the beginning of the end to the concept of mapping types. Defined as a breaking change, and relating to the core functionality and usage of Elasticsearch, this change was somewhat controversial. Some users even went as far as viewing this change as a “casus belli” of sorts.

While there is little doubt that the removal of types has huge implications for Elasticsearch users, it is important to chill the atmosphere by understanding the reasons for this move and its meaning as well as ways to adapt to it.

What are Elasticsearch mapping types?

Let’s begin with trying to understand what mapping types are and how they were used up until now.

Within Elasticsearch, mapping defines how a document is indexed and how its fields are indexed and stored. Each Elasticsearch index had one or more mapping types that were used to divide documents into logical groups. In other words, a type in Elasticsearch represented a class of similar documents.

Comparing to the world of relational databases (a problematic comparison that we will get to later in the article), types can be compared with tables.

Let’s say we are indexing different types of students (e.g. honor students, failing students, etc.)

The mapping type for such an index would consist of a combination of fields as seen in this example:

curl -X PUT 'http://localhost:9200/students' -d '{ "mappings": { "student": { "properties": { "name": { "type": "keyword" }, "degree" { "type": "keyword" }, "age": { "type": "integer" } }, "properties": { "performance": { "type": "keyword" } } } } }'

The index would then be built with a  _type field which was combined with the document’s _id field to generate a _uid field, allowing documents of different types to co-exist in the same index.

So to index a new student, we would use:

curl -X PUT 'http://localhost:9200/students/student/1' -d '
{ "name" : "Isaac Newton", "age": 14, "performance": "honor student"

And when querying Elasticsearch for a student, we would use the mapping type by including it in the URL:

curl -X GET 'http://localhost:9200/students/student/_search' -d '
{ "query": { "match": { "name": "Isaac Newton" } }

This kind of search would return:

{ "name" : "Isaac Newton", "age": 14, "performance": "honor student"

You can read more about Elasticsearch mapping types here.

So, why are mapping types being removed?

Looking back in time to an issue opened in 2015 on the Elasticsearch GitHub page gives us insight into the main reason mapping types are being removed – to simplify the understanding and usage of the underlying data structure in Elasticsearch and to optimize performance.

When trying to understand or explain the data structure Elasticsearch is based upon, users often referred to a comparison with relational databases, where: index = database, type = table.

While there is a certain logic to this analogy, it is simply incorrect from a technical standpoint.

Unlike SQL databases, where tables are totally independent from each other, and where columns in different tables do not affect one another, Elasticsearch fields and mapping types in the same index are interrelated.

Fields that share the same name but are used in different mapping types are backed by the same Lucene field. This means, in turn, that fields with the same name, and which are used in different types in the same index must have the same mapping definition. A constraint that can be extremely limiting in some scenarios.

If we wanted to use the performance field name in another type within the same index, it would have to have the same mapping definition as defined in the student type:

curl -X PUT 'http://localhost:9200/students' -d '{ "mappings": { "student": { "properties": { "name": { "type": "keyword" }, "degree" { "type": "keyword" }, "age": { "type": "integer" } }, "properties": { "performance": { "type": "keyword" } } } "teacher": { "properties": { "name": { "type": "keyword" }, "degree" { "type": "keyword" }, "age": { "type": "integer" } }, "properties": { "performance": { "type": "keyword" } } } } }'

The second issue is related to how Lucene handles documents with empty fields (field with no value), an issue otherwise known as data sparsity. Multiple types in the same index result in most cases in a large amount of empty fields, which because of the way Lucene stores data, results in suboptimal resource utilization.

Using types in Elasticsearch 6

We can start with the good news, which is that types have not been totally removed. Yet.

While indices created in version 6 are only allowed one mapping type, indices created in version 5 containing multiple types can continue to work as before. You can also use the Elasticsearch ReIndex API to convert these indices to single-type indices.

To achieve the same functionality in Elasticsearch 6, what alternative methods are there?

Other than the obvious method of placing all your properties under one single type, there are two additional methods for accomplishing the same goal achieved with multiple mapping types in previous Elasticsearch versions.

The first is to have an index per document type. In our example, you could put honor students in one index, failing students in another index, and so forth for each type of student.

Another solution is to create a custom field which works pretty similar to how the good old _type meta-field worked:

curl -X PUT -H "Content-Type: application/json" 'http://localhost:9200/students' -d '
{ "mappings": { "doc": { "properties": { "type": { "type": "keyword" }, "name": { "type": "text" }, "age": { "type": "integer" } } } }

Continuing with our previous example, we can populate this field with some data:

curl -X POST -H "Content-Type: application/json" 'http://localhost:9200/students/doc/1' -d '
{ "type": "honor student", "name": "Isaac Newton", "age": 14

So to search for our honor students, we would use:

curl -X GET -H "Content-Type: application/json" 'http://localhost:9200/students/_search' -d '
{ "query": { "bool": { "must": { "match_all": {} }, "filter": { "term": { "type": "honor student" } } } }

Summing it up

The removal of mapping types is a process that started already in version 5, has been taken a serious step further in step 6, and is planned to continue in the next versions until the complete removal in version 9 (stay updated, and take a look at the planned changes here).

Breaking changes are always a pain, especially fundamental changes that require rethinking the way we operate, but at the end of the day, the end game is all that matters. While indeed a breaking change in any Elasticsearch index with multiple types, it also seems to be an important change with obvious benefits in terms of ease-of-use and performance. Ultimately, forcing us to use indices in a way that is more suited to the underlying data structure should speed up searches.

It seems that most users tend to agree with this assertion:

As we begin the migration process to Elasticsearch 6, we will document and report any issues we come across in respect to this change, so stay tuned for news.

Original Link

Solr vs. Elasticsearch: An Introductory Comparison

Cloud computing and data growth are more relevant than ever before, with the applications people are using on their computers, smartphones, and tablets generating and processing several zettabytes (1e+9 terabytes) of data. As all of the information continues to accumulate and people demand even better performance, figuring out how to search through all of that data in an effective and efficient manner proves to be quite a challenging task. Without a fast, organized, and reliable way of handling the data they are working with, developers will struggle to attract and retain users.

This article will shed some light on two of the most popular open-source search engines available: Solr and Elasticsearch. Both of these engines were developed using Apache Lucene, so it will not come as a surprise that the two have very similar functionalities. That being said, the two have differences when it comes to scalability, ease of deployment, and other features. Before jumping into these two engines, some context on Lucene may be helpful.

What Is Apache Lucene?

Apache Lucene, initially released in 1999, is a freely available Java-based text search engine library. The Apache server is distributed under an open source license, meaning that developers have the freedom of altering the service to suit their own needs. The Lucene API maintains its form irrespective of the format of the file that will be indexed, making it popular for internet search engines and single-site search operations.

What Is Apache Solr?

Apache Solr is an open-source search platform built on the Java library Lucene in 2004. Solr offers Lucene’s search features in a very user-friendly way, and since it has been around for over a decade, it is a mature API with a strong community with a broad array of users.

Solr offers such features as replication, load-balanced querying, automated failover and recovery, and distributed indexing. If developers are able to successfully deploy and manage Solr, it is capable of being a very reliable, scalable, and safe search engine. Companies like Amazon, Instagram, and Netflix use Solr because it allows them to index and search multiple sites at once.

What Is Elasticsearch?

Publicly released in 2010, Elasticsearch was introduced sometime after Solr. Elasticsearch offers a multitenant-capable, distributed, full-text search engine with an HTTP web interface. Multi-tenancy, a software structure in which a single instance of software runs serves multiple users from a single server source, is one of the biggest features Elasticsearch provides. The search engine allows developers to divide indices into shards containing multiple replicated copies.

Which Performs With More Efficiency?

While both Solr and Elasticsearch share many of the same functionalities, there are some features that have made Elasticsearch a more popular product.

Reputed German iX magazine has listed the main advantages of using Elasticsearch over Solr, which may help individuals trying to determine which option to go with:

  • Elasticsearch is distributed. There is no need for separate project files and replicas are near real-time, providing a snappier and quicker experience.
  • Elasticsearch supports the near real-time search capabilities of the original Apache Lucene API.
  • Multi-tenancy is much easier to set up than the more complex configuration that Solr requires.
  • Elasticsearch introduces the concept of “The Gateway,” which has ultimately made it easier to perform and restore full backups.

Original Link

Spring, Reactor, and ElasticSearch: Benchmarking With Fake Test Data

In the previous article, we created a simple adapter from ElasticSearch’s API to Reactor’s Mono, which looks like this:

import reactor.core.publisher.Mono; private Mono<IndexResponse> indexDoc(Doc doc) { //...

Now we would like to run this method at a controlled concurrency level — millions of times. Basically, we want to see how our indexing code behaves under load by benchmarking it.

Fake Data With jFairy

First, we need some good looking test data. For that purpose, we’ll use the handy jFairy library. The document we’ll index is a simple POJO:

class Doc { private final String username; private final String json;

The generation logic is wrapped inside a Java class:

import io.codearte.jfairy.Fairy;
import io.codearte.jfairy.producer.person.Address;
import io.codearte.jfairy.producer.person.Person;
import org.apache.commons.lang3.RandomUtils; @Component
class PersonGenerator { private final ObjectMapper objectMapper; private final Fairy fairy; private Doc generate() { Person person = fairy.person(); final String username = person.getUsername() + RandomUtils.nextInt(1_000_000, 9_000_000); final ImmutableMap<String, Object> map = ImmutableMap.<String, Object>builder() .put("address", toMap(person.getAddress())) .put("firstName", person.getFirstName()) .put("middleName", person.getMiddleName()) .put("lastName", person.getLastName()) .put("email", person.getEmail()) .put("companyEmail", person.getCompanyEmail()) .put("username", username) .put("password", person.getPassword()) .put("sex", person.getSex()) .put("telephoneNumber", person.getTelephoneNumber()) .put("dateOfBirth", person.getDateOfBirth()) .put("company", person.getCompany()) .put("nationalIdentityCardNumber", person.getNationalIdentityCardNumber()) .put("nationalIdentificationNumber", person.getNationalIdentificationNumber()) .put("passportNumber", person.getPassportNumber()) .build(); final String json = objectMapper.writeValueAsString(map); return new Doc(username, json); } private ImmutableMap<String, Object> toMap(Address address) { return ImmutableMap.<String, Object>builder() .put("street", address.getStreet()) .put("streetNumber", address.getStreetNumber()) .put("apartmentNumber", address.getApartmentNumber()) .put("postalCode", address.getPostalCode()) .put("city", address.getCity()) .put("lines", Arrays.asList(address.getAddressLine1(), address.getAddressLine2())) .build(); } }

Quite a bit of boring code that actually does something cool. Every time we run it, it generates random, but reasonable JSON like so:

{ "address":{ "street":"Ford Street", "streetNumber":"32", "apartmentNumber":"", "postalCode":"63913", "city":"San Francisco", "lines":[ "32 Ford Street", "San Francisco 63913" ] }, "firstName":"Evelyn", "middleName":"", "lastName":"Pittman", "email":"", "companyEmail":"", "username":"epittman5795354", "password":"VpEfFmzG", "sex":"FEMALE", "telephoneNumber":"368-005-109", "dateOfBirth":"1917-05-14T16:47:06.273Z", "company":{ "name":"Woods LLC", "domain":"", "email":"", "vatIdentificationNumber":"30-0005081", "url":"" }, "nationalIdentityCardNumber":"713-79-5185", "nationalIdentificationNumber":"", "passportNumber":"jVeyZLSt3"

Neat! Unfortunately, it’s not documented whether jFairy is thread safe so just in case, I’m using ThreadLocal. OK, so we have one document, but we need millions! Using a for-loop is so old-fashioned.

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; private final Scheduler scheduler = Schedulers.newParallel(PersonGenerator.class.getSimpleName()); Mono<Doc> generateOne() { return Mono .fromCallable(this::generate) .subscribeOn(scheduler);
} Flux<Doc> infinite() { return generateOne().repeat();

generateOne() wraps the blocking generate() method in a Mono<Doc>. Additionally, generate() is run on the parallel Scheduler.

Why? It turned out that jFairy wasn’t fast enough on a single core (lots of random number generation, table lookups, etc.), so I had to parallelize data generation. That shouldn’t normally be an issue, but when generating fake data is slower than your reactive application that touches an external server, that tells you something about the performance of Netty-based Spring WebFlux.

Calling ElasticSearch Concurrently

All right, having an infinite stream of good looking fake test data, we now want to index it in ElasticSearch.

void startIndexing() { index(1_000_000, 1_000);
} private void index(int count, int maxConcurrency) { personGenerator .infinite() .take(count) .flatMap(this::indexDocSwallowErrors, maxConcurrency) .window(Duration.ofSeconds(1)) .flatMap(Flux::count) .subscribe(winSize -> log.debug("Got {} responses in last second", winSize));
} private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) { return indexDoc(doc) .doOnError(e -> log.error("Unable to index {}", doc, e)) .onErrorResume(e -> Mono.empty());

When the application starts, it initiates indexing of 1 million documents. Notice how easy it is to tell Reactor (same for RxJava) that it should invoke up to one thousand concurrent requests to ElasticSearch. Once every second, we count how many responses we received:

Got 2925 responses in last second
Got 2415 responses in last second
Got 3336 responses in last second
Got 2199 responses in last second
Got 1861 responses in last second

Not bad! Especially when you consider that there are up to one thousand concurrent HTTP requests and our application started barely 30 threads peak! Alright, it’s localhost <-> localhost, guilty! But how do we actually know all of that? Logging is fine, but it’s the 21st century. We can do better! Monitoring will be the subject of next installment.

The source code is available in reactive-elastic-search branch.

Original Link

Spring, Reactor, and ElasticSearch: From Callbacks to Reactive Streams

Spring 5 (and Boot 2, when it arrives in a couple of weeks) is a revolution. Not the “annotations over XML” or “Java classes over annotations” type of revolution. It’s truly a revolutionary framework that enables writing a brand new class of applications.

In recent years, I became a little bit intimidated by this framework. Spring Cloud is a framework that simplifies the usage of Spring Boot, which is a framework that simplifies the usage of Spring, which is a framework that simplifies enterprise development. (also known as “start… dot spring… dot I… O“) lists 120 different modules (!) that you can add to your service. Spring, these days, has become an enormous umbrella project, and I can imagine why some people (still!) prefer Java EE (or whatever it’s called these days).

But Spring 5 brings the reactive revolution. It’s no longer only a wrapper around a blocking servlet API and various web frameworks. Spring 5, on top of Project Reactor, allows writing high-performance, extremely fast, and scalable servers, avoiding the servlet stack altogether.

Damn, there is no Jetty or even servlet API on the CLASSPATH! At the heart of Spring 5 WebFlux, we will find Netty, a low-level framework for writing asynchronous clients and servers. Finally, Spring becomes a first-class citizen in the family of reactive frameworks.

Java developers can implement fast services without leaving their comfort zone and going for or Spring 5 is a fully reactive, modern tool for building highly scalable and resilient applications. Nevertheless, the underlying principles like controllers, beans, and dependency injection are all the same. Moreover, the upgrade path is smooth, and we can gradually add features rather than learning a brand new, alien framework.

Enough talking, let’s write some code.

In this article, we will write a simple headless application that indexes documents in ElasticSearch in large volume. We will aim for thousands of concurrent connections with just a handful of threads, even when the server becomes slow.

However, unlike e.g. Spring Data MongoDB, Spring Data ElasticSearch does not natively support non-blocking repositories. Well, the latter doesn’t even seem to be maintained anymore, with the current version being 3 years old. Many articles target Spring 5 + MongoDB with its repositories returning non-blocking streams (Flux or Flowable from RxJava). This one will be a little bit more advanced.

The ElasticSearch 6 Java API uses a RESTful interface and is implemented using a non-blocking HTTP client. Unfortunately, it uses callbacks rather than something sane like CompletableFuture. So let’s build the client adapter ourselves.

ElasticSearch Client Using Fluxes and Monos

Source code for this article is available at, on the reactive-elastic-search branch.

We would like to build an ElasticSearch Java client that supports Project Reactor by returning Flux or Mono. Of course, we get the greatest benefit if the underlying stream is fully asynchronous and does not consume threads. Luckily, the Java API is just like that. First, let’s set up ElasticSearch’s client as a Spring bean:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient; @Bean
RestHighLevelClient restHighLevelClient() { return new RestHighLevelClient( RestClient .builder(new HttpHost("localhost", 9200)) .setRequestConfigCallback(config -> config .setConnectTimeout(5_000) .setConnectionRequestTimeout(5_000) .setSocketTimeout(5_000) ) .setMaxRetryTimeoutMillis(5_000));

In real life, we would obviously parametrize most of this stuff. We will be indexing simple JSON documents. For the time being, their content is not important:

class Doc { private final String username; private final String json;

The code we will write wraps RestHighLevelClient and makes it even more high level by returning Mono<IndexResponse>. Mono is pretty much like CompletableFuture but with two exceptions:

  • It’s lazy — as long as you don’t subscribe, no computation is started
  • Unlike CompletableFuture, Mono can complete normally without emitting any value

The second difference was always a bit misleading to me. In RxJava 2.x, there are two distinct types: Single (always completes with value or error) and Maybe (like Mono). Too bad Reactor doesn’t make this distinction. Nevermind. How does the adapter layer look like? The plain Elastic API looks as follows:

client.indexAsync(indexRequest, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { //got response } @Override public void onFailure(Exception e) { //got error }

You can see where this is going: callback hell. Rather than exposing a custom ActionListener as an argument to this logic, let’s wrap it in Mono:

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink; private Mono<IndexResponse> indexDoc(Doc doc) { return Mono.create(sink -> { IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername()); indexRequest.source(doc.getJson(), XContentType.JSON); client.indexAsync(indexRequest, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { sink.success(indexResponse); } @Override public void onFailure(Exception e) { sink.error(e); } }); });

We must create IndexRequest wrapping a JSON document and send it via RESTful API. But that’s not the point. We are using the Mono.create() method. It has some drawbacks, but more on that later. Mono is lazy, so barely calling indexDoc() doesn’t suffice. No HTTP request was made to ElasticSearch.

However, every time someone subscribes to this one-element source, the logic inside create() will be executed. The crucial lines are sink.success() and sink.error(). They propagate results from ElasticSearch (coming from the background, asynchronous thread) into the stream. How do we use such a method in practice? It’s very simple!

Doc doc = //...
indexDoc(doc) .subscribe( indexResponse ->"Got response") );

Of course, the true power of reactive stream processing comes from composing multiple streams. But we made our first steps: transforming a callback-based asynchronous API into a generic stream. If you are (un)lucky enough to use MongoDB, it has built-in support for reactive types like Mono or Flux right in the repositories. The same goes for Cassandra and Redis. In the next article, we will learn how to generate some fake data and index it concurrently.

Original Link

Monitor TFS Search Data Usage

In a previous post, I explained how to move a searches component in a different machine on a TFS 2018 installation. Now, it is time to understand how to monitor that instance.

First of all, you should monitor the folder that physically contains data. In my installation, it is C:\tfs\ESDATA. This is a parameter that you choose when you configure the Search Server with the Configure-TFSSearch.ps1 -Operation install PowerShell script. The data folder should be placed on a fast disk; usually, SSD is the best choice. If the SSD ever fails, you can always restart ES with an empty data folder and use scripts to force TFS to reindex everything.

Data in ElasticSearch can be wiped away without a problem; like a database warehouse, its content can always be restored.

If you want to have a better idea of what is happening to your search instance, you can install plugins to ease management. The first step is to connect to the machine where the Search Service is running and locate the folder where ElasticSearch was installed. A simple trick is checking the corresponding Windows Service.

Figure 1: Locate ElasticSearch binaries from Windows Service

Now, simply open a command prompt and change the directory to the bin installation directory of ElasticSearch. From here, you can install various plugins to simplify ES management. I usually start with the HQ plugin; you can install it with the simple instruction:

plugin install royrusso/elasticsearch-HQ/v2.0.3

You should check the correct version on the HQ home site — currently, version 2.0.3 is the latest version that works with ES 2.4.1, the version used by TFS Search Service. For this command to be able to run, the machine should have an internet connection and the GitHub site should not be blocked.


Figure 2: HQ was correctly installed

Now, you can just browse to http://localhost:9200/_plugin/HQ to open the web interface of the HQ plugin and connect to the actual instance.


Figure 3: HQ plugin up-and-running and ready to be connected to a local instance of ES


Figure 4: The homepage of HQ, where you can find the total size used by the server (1) as well as some node diagnostics (2)

From the Node Diagnostics tool, you can find if some statistics are not good. In my example, I have Search Server on a server with slow disk, and query time is sub-optimal. Usually, ES is supposed to answer in less than 50 milliseconds; in my installation, I have an average time of 660 ms.


Figure 5: Statistics on search.

If you move the mouse over the number, it can give you some hints on the reason why the specific number is calculated and why it is not considered to be good.


Figure 6: Help on the various number gives you an idea of what is wrong and how you could solve this.

I suggest that you navigate in the HQ interface to familiarize yourself with the various information it can give to you, especially clicking on the name of a single node (there is only one node in my installation). You can get some interesting data on that single node, especially on RAM usage.


Figure 7: Ram usage of a single node.

There are lots of other plugins that can show lots of useful information about your ElasticSearch instance. The installation is similar to HQ and you can install and use them with very few clicks.

Original Link

Elastic Stack 6: What You Need to Know

Elastic Stack 6 was released last November, and now’s a good time as any to evaluate whether to upgrade. To help you guys make that call, we are going to take a look at some of the major changes included in the different components in the stack and review the main breaking changes.

The main changes to Elasticsearch are designed to boost performance, improve reliability, and make recovery easier. The most significant changes in Logstash are focused on allowing users to run multiple self-contained pipelines on the same JVM. There are no major changes to Kibana, but a relatively large amount of minor usability improvements were added. With the exception of the addition of Auditbeat, the main changes made to Beats are focused on improving the performance of and enhancing existing log shippers.

Let’s take a closer look.

Elasticsearch 6

Changes to Elasticsearch are mostly internal and shouldn’t require most organizations to alter how they configure or administer the Elasticsearch, with the big exception being the change to mapping types.

Sparse Doc Values

A sparse values situation (when documents do not have values for each of the fields in our indices) results in the use of a large amount of disk space and file-system cache. The change to Lucene 7 allows Elasticsearch to now support sparse doc values, a new encoding format that reduces disk space and improves query throughput.

Index Sorting

Lucene 7 also brings the ability to specify a sort order to indices. This boosts performance by allowing for the sorting of indices during re-indexing (while documents are written) instead of when documents are read. The indices are written to disk in the order of the specified sort.

Specifying a sort of indices means that a search can terminate when it has found the documents requested by the query. For example, if the sort is alphabetical, then a search for entries beginning with “E” can quit when it reaches “F” because all items beginning with E have been read.

No More Mapping Types

This is one of the most talked-about changes in Elasticsearch 6, and a somewhat controversial one at that.

In this version, indices can only have one mapping type. Indices created in 5.x with multiple mapping types will continue to function as before. The plan is to remove mapping types completely in version 7.

The main reason for this move is to simplify the understanding and usage of the underlying data structure in Elasticsearch. Comparisons to RDBMS databases have led to a faulty understanding that types can be compared to tables. This has led in turn, to an expectation for fields to be independent across types whereas they must be of the same field type.

While this change fundamentally changes the way we index data, and as such has received a decent amount of criticism in the community, it also promises to speed searches by forcing users to use indices in a fashion tailored to the underlying database structure. The common existing practice of treating indices like tables and types like tables is suboptimal. Enforcing a single type per index should provide for significant performance increases.

Better Shard Recovery

A new feature called Sequence IDs promises to guarantee more successful and efficient shard recovery.

Every index, update, and delete operation receives an ID that is logged in the primary shard’s transaction log. A replica can now refer to the operations recorded in this log and use them to update itself without needing to copy all the files, thus making recovery much faster. You’ll be able to configure how long to keep these transaction logs.

Replicas can run unacknowledged and different operations — meaning that in case of a primary shard failing, the replicas will be able to sync with the new primary shard without waiting for the next recovery.


Updating to the new Elasticsearch version is made easier with a series of upgrade improvements that aim at tackling some of the traditional hurdles facing upgrade procedures.

A new rolling restart feature negates the need for a full cluster restart and thus minimizes downtime. Elasticsearch 5.x indices will be able to be searched using cross-cluster searching: a new approach to cross-cluster operations that replaces the traditional tribe-node based approach. Deprecation logs have been reinforced with important info on breaking changes. And if you’re an X-Pack user, you will be able to manage the upgrade via the UI.

Logstash 6

Logstash was originally intended to handle one type of event per instance, and prior to this version, each Logstash instance supported only a single event pipeline. Users can circumvent this restriction using conditional statements in the configuration, which often leads to a new set of problems.

Logstash now supports native support for multiple pipelines. These pipelines are defined in a pipelines.yml file, which is loaded by default. If Logstash is started with the -r flag, it will periodically reread the pipelines.yml file looking for changes, making dynamic multiple pipelines possible for a single Logstash instance.

Each pipeline’s independent configuration means that each pipeline can be assigned different levels of resources. A single Logstash instance could, for example, have multiple pipelines with a single worker thread and a high-volume pipeline with ten worker threads. Combining this with Logstash’s ability to periodically reload configurations means dynamic reconfiguration without having to tear the entire application down to meet changing needs.

X-Pack users will be able to manage multiple pipelines within Kibana. This solution uses Elasticsearch to store pipeline configurations and allows for on-the-fly reconfiguration of Logstash pipelines.

Another noteworthy addition to Logstash is a new conversion tool to help those using Elasticsearch Ingest Nodes to begin using Logstash. The tool allows users to enter their Ingest Node configuration and outputs a Logstash pipeline configuration.

Last but not least, a new pipeline viewer now allows users to monitor and analyze Logstash pipelines in Kibana. Pipelines are displayed on a graph where each component is accompanied by relevant metrics. I explored the pipeline viewer in this article.

Kibana 6

As mentioned in the introduction to this article, there are no major changes to the stack’s UI, but there is a nice list of usability improvements that users will most likely enjoy.

The main changes to the UI include a new CSV export option, new colors for better contrast, and improved screen reading and keyboard navigation.

Dashboarding in Kibana has two new features. A new full-screen mode was added when viewing dashboards to allow users to enjoy all the Kibana goodness. In addition, the new dashboard only mode enables administrators to share dashboards safely. Using a new kibana_dashboard_only_user role, specific users can be given limited access to Kibana with read-only data visibility and with no ability to modify or delete any of the dashboards.

Also of note is a new querying language called Kuery that seeks to address the issues of previous search mechanisms and that is meant to ultimately simplify querying and filtering in Kibana. Disabled by default, the new language supports all the query types that are supported by the Elasticsearch DSL, including those not supported by Lucene query syntax (such as aggregations and visualizations) — directly in the query language.

Beats 6

Auditbeat is the only new shipper released in this version and is basically a native and easy-to-use auditing tool for the Elastic Stack. I tried the beta back in September and am pretty sure this will become one of the more popular beats.

Most of the other changes center around improving support and performance of existing beats, mostly Filebeat and Metricbeat.

Filebeat and Metricbeat have a tighter integration with Docker and Kubernetes now with new processors that add metadata to the logs collected from these environments, such as details on the container name, image, labels, pod name, and so forth. A new Kubernetes module was also added to Metricbeat that gives insights into your Kubernetes deployment.

Additional changes to Filebeat and Metricbeat include deployment manifests for easier setup in Kubernetes and a new modules.d directory that contains configuration files for the different supported modules.

A new set of commands for Beats allows users to list, enable, and disable modules, export configurations and the Elasticsearch mapping template, and test Logstash or Elasticsearch connectivity.

Last but not least, internal changes were introduced in the Beats pipeline architecture to improve performance, including a transition to an asynchronous pipeline that allows other processes to continue even as one hangs in a wait state. This also made the Filebeat internal spooler obsolete. For Metricbeat specifically, polling frequency was reduced and sharding behavior changed; that, coupled with the changes made to Elasticsearch, results in less storage used.

What Will Break?

We’re going to end this piece with a summary of the main gotchas that need to be considered before upgrading. Keep in mind this is a partial list of breaking changes only. As always, we recommend you read the official documentation carefully and test the upgrade process.


  • As mentioned above, the main breaking change in this version is the gradual removal of mapping types from indices, so Elasticsearch 6.0 will be able to read indices created in version 5.0 or above only.
  • Elasticsearch 6.0 requires a reindexing before full functionality can be achieved; this is because of a number of changes to the Elasticsearch indices. These are accompanied by changes to the CAT, Document, and Java APIs and the Search and Query DSL, as well as REST changes. The result is a number of key changes that affect Elasticsearch deeply, altering everything from the structure of queries and scripts to how internal components communicate.

A full list of the breaking changes is available here.


Changes in Logstash 6 involve several breaking changes, including a number of changes to the input and output options. There are also numerous plugin changes and a change to the config.reload.interval configuration option, which now uses time value strings (5m, 10s, etc.) instead of millisecond time values.

A full list of the breaking changes is available here.


To migrate existing Kibana installations to Kibana 6.0, the Kibana index needs to be re-indexed. A full list of the breaking changes is available here.


  • The main breaking change in Beats 6 is the removal of the Filebeat internal spooler, as described above. This results in the removal of some of the configuration options and the addition of others.
  • Following the changes made to the pipeline architecture, there is no longer an option to enable two outputs at the same time (you can use multiple outputs in Logstash instead or run multiple instances of your beat shipper).
  • If you’re outputting to Logstash from your beat, you now need to include a version in the Logstash index setting.

A full list of the breaking changes is available here.

Summing It Up

As with any major release of the stack, there are quite a large number of changes in Elastic Stack 6.0. Many of these changes stem from the upgrade to the Lucene 7 database engine, but just as many are part of a general push towards increased efficiency and performance.

Elastic Stack 6.0 also brings a number of important security changes and enhancements and sets the stage for many more. This follows the needs of users deploying Elastic Stack in production environments, where complex security requirements are increasingly the standard.

In many ways, version 6.0 is a transitionary release. It is a midpoint between traditional deployments of the stack that clung to concepts of data organization and production deployments belonging to relational database designs and more modern approaches.

That a major version of the stack comes with a need to re-index, changes to the index structure, and a number of configuration changes to various plugins should come as no surprise. Migration from previous versions will need to be planned carefully and, above all, tested.

Original Link

Creating a Local Development Environment With Docker Compose

Creating a simple and reliable development environment is essential to developer productivity as well as on-boarding new team members. It’s far too common for companies to have extremely intricate and fragile development environments. Teams should constantly be improving their local development environments. Small annoyances here and there may not seem like much but remember it impacts every developer using that code base. Let’s take a look at one of the most more difficult parts of a developer environment, data stores. We will see how to manage them cross-platform utilizing Docker Compose.

What Is Docker?

According to their website “Docker is an open platform for developers and sysadmins to build, ship, and run distributed applications, whether on laptops, data center VMs, or the cloud.” In layman’s terms, it’s basically an image containing a low-level OS and all software pre-configured. Essentially a much more lightweight VM image. Multiple docker containers can run in a single OS unlike VMs where each VM runs an OS of its own eating up resources. Containers can be anything along the lines of a database, document store, message broker, search service, an existing application, or anything else you can think of. Our use case is quite simple, we want MySQL and Elasticsearch running in our development environment without the need to manually install them. This also gives us the flexibility to have different docker compose files for different applications and have no need to worry about naming conflicts or different versions of a dependency across projects.

Docker Hub

Docker Hub is a repository of shared docker containers you can readily import. We will be using it as the base for our MySQL and Elasticsearch containers.

Docker Compose

Docker Compose is a tool for defining and running multi-container Docker applications. Each application we have can have its own docker compose file for a single command boot up of our development environment. Below is our StubbornJava docker compose file. We will take a look at each container later.

version: '2'
services: elasticsearch: image: restart: always container_name: stubbornjava_elasticsearch environment: - - - http.port=9200 - transport.tcp.port=9300 - discovery.zen.minimum_master_nodes=1 - discovery.type=single-node - http.cors.enabled=true - - node.master=true - - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m" ulimits: memlock: soft: -1 hard: -1 volumes: - stubbornjava_esdata:/usr/share/elasticsearch/data ports: - 9200:9200 mysql: build: mysql restart: always container_name: stubbornjava_mysql environment: - MYSQL_ROOT_PASSWORD= - MYSQL_ALLOW_EMPTY_PASSWORD=yes ports: - "3306:3306" volumes: - stubbornjava_mysql:/var/lib/mysql volumes: stubbornjava_esdata: driver: local stubbornjava_mysql: driver: local

View on GitHub

MySQL Container

We are starting with a base MySQL docker image from Docker Hub and adding some customizations to the my.cnf file. Mainly we want to support full Unicode using utf8mb4 see MySQL 8.0: When to use utf8mb3 over utf8mb4? for more info.

MySQL Dockerfile

Starting with our base Docker image we then apply our custom my.cnf file as well as run a script to help configure things such as creating databases or users.

FROM mysql:5.7 COPY mysqld.cnf /etc/mysql/mysql.conf.d/mysqld.cnf
COPY ./ /docker-entrypoint-initdb.d/ EXPOSE 3306 CMD ["mysqld"]

View on GitHub

MySQL My.cnf

Our custom MySQL config file.

pid-file = /var/run/mysqld/
socket = /var/run/mysqld/mysqld.sock
# Where the database files are stored inside the container
datadir = /var/lib/mysql # My application special configuration
max_allowed_packet = 32M
sql-mode = 'STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION' # Accept connections from any IP address
bind-address = character-set-server=utf8mb4

View on GitHub

MySQL Setup Script

Create databases and users or anything else needed to bootstrap.

#!/bin/sh echo 'creating databases'
mysql -u root -e 'CREATE DATABASE IF NOT EXISTS stubbornjava;'

View on GitHub

Elasticsearch Container

Since we are just using the default ES container we don’t need a separate Dockerfile here. All of the configuration can be passed in directly from the Docker compose file. Take note this is a single server set up which is probably ok for local development or small projects but you will probably want a better setup for non-local environments.

Running the Dev Environment

Simply change directory to where the Docker compose file lives and run docker-compose up. You should see logs coming from both containers and you should be up and running.

mysql -u root --host=localhost --protocol=tcp -e 'SHOW DATABASES';
| Database |
| information_schema |
| mysql |
| performance_schema |
| stubbornjava |
| sys |

You need to specify protocol=tcp since mysql is running in a container. You can also share a socket file between the volume and local OS.

curl localhost:9200
{ "name" : "stubbornjava", "cluster_name" : "stubbornjava-cluster", "cluster_uuid" : "-2ALiVFtTr6iJZkip8KXzA", "version" : { "number" : "5.6.1", "build_hash" : "667b497", "build_date" : "2017-09-14T19:22:05.189Z", "build_snapshot" : false, "lucene_version" : "6.6.1" }, "tagline" : "You Know, for Search"

Both MySQL and Elasticsearch are up and running.

Original Link

InfluxDB vs. Elasticsearch for Time Series Analysis

Choosing which storage solution to use for time series data is not a straightforward task, to say the least. Today, one is almost spoilt for choice as there are some great alternatives out there, but this article attempts to shed some light on two of these solutions: Elasticsearch and InfluxDB.

Why this comparison? While InfluxDB was designed for time series data, Elasticsearch was designed for (and is mostly used for) document indexing. However, despite this design and functional disparity, Elasticsearch is used by many for time series data and we wanted to check how well these two databases perform.

Hopefully, this high-level comparison will help those searching for an answer to the dilemma stated above.

What Is Time Series Data?

Time series data can be defined as data points indexed by their temporal order, where the distance between two data points may or may not be equal. If the frequency at which data points are taken is constant (for example, sampling the data every 10 ms), then the series is called a discrete time data series.

In computer systems, all user data can potentially be represented as time series data, as all stored information has a time component that can provide different metrics in different scenarios. For example, Twitter, Facebook, and LinkedIn have data on the user’s registration date, as well as the dates and times at which various actions were performed (tweet or article posted, activity liked, etc.).

Having data arrive at a higher frequency can create challenges, including having to handle a greater number of write requests per second and needing to store all the data. One sensor, with a sampling frequency of 30 requests per second and a payload of 1KB, can generate 86MB of information each day, meaning 100 sensors would create a data load of 8GB per day. Querying and aggregating such a large amount of data to extract useful information is another issue to be considered.

And so, deciding on the right storage engine to use for time series data is one of the first challenges to overcome when designing a temporal data-generating system.

In principle, you can use almost any database engine to store your time series data, but issues can arise later when you want to perform analysis on the information collected. We are going to discuss two popular databases that are used to store and analyze time series data, mostly in combination with other tools.


InfluxDB is a database whose general purpose is to store time series data. The storage and querying of data are optimized for data points with a time component.

When it comes to storing data, the InfluxDB team has developed a storage engine that follows the LSM tree paradigm. The engine organizes its data in shards for each block time interval depending on the retention policy. If the retention policy is unlimited, then shards will be stored for seven days. Each shard is related to the database for which it is created. Besides the data shards, the storage engine consists of multiple other components like the in-memory index, WAL, cache for data stored in WAL, TSM files where the data is compressed and stored, FileStore, and more.

Time series can be organized in databases where you can logically organize the kinds of data you are planning to store. Measurements such as and others can, in a way, be treated as tables are in SQL, where the primary key is always the time component. These “tables” have to contain at least one key component that describes which data is stored (for example, core_1server_12) and its numeric value. When queried, the measurement table will return the timestamp, together with the keys and value for the stored data.

Writing data to InfluxDB can be done in many ways. You can use the command line interface provided, the client libraries for your language, or the REST API made available for both reading and writing data to the database. This API also allows you to create or drop databases and tables.

Here is an InfluxDB example returning values from the cpu_load_short table:

curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=mydb" --data-urlencode "q=SELECT \"value\" FROM \"cpu_load_short\" WHERE \"region\"='us-west'"

To facilitate the easy extraction of data from the database, InfluxDB provides a SQL-ish interface, so called because it does not make all SQL commands available. Data aggregation can also be performed on the database level without any need for external processing. Support for aggregation queries is built into InfluxDB and can be accessed through the SQL and REST interfaces. Some of the functions available for aggregation are COUNT(), DISTINCT(), INTEGRAL(), MEAN(), MEDIAN(), MODE(), SPREAD(), STDDEV(), and SUM().

Here is an example for an InfluxDB aggregation query:

SELECT COUNT("water_level") FROM "h2o_feet"

Also, InfluxDB has support for data transformation, selectors, and even prediction queries. By calling functions, you can easily transform data before returning it to the client that is going to consume it (see here for more info).

Check out this example of a data transformation query returning a derivative of water levels between each data point over a specified date interval:

SELECT DERIVATIVE("water_level") FROM "h2o_feet" WHERE "location" = 'santa_monica' AND time >= '2015-08-18T00:00:00Z' AND time <= '2015-08-18T00:30:00Z'

InfluxDB also provides a simple UI where you can run queries on the database, view data in tabular form, and also execute some DDL operations. To visualize your data, you can use other tools like Grafana, where you can configure access to your InfluxDB data and visualize it.

Installing InfluxDB is relatively simple. Depending on the operating system, you can either use a package manager or download the binaries and install it manually. It’s worth pointing out that even though InfluxDB is an open-source database, released under an MIT license, it has some features that are not open-sourced (such as those related to clustering) that are available in the InfluxEnterprise product offered by the maintainer, InfluxData.


Elasticsearch is a full-text search and analysis engine based on Apache Lucene. Since its initial release in 2010, Elasticsearch has gained popularity as a fast and scalable document indexing and search engine with millions of users worldwide.

Elasticsearch stores data in indices, similar to relational databases, where data is logically separated. A single index can contain data for users (personal information, hobbies, etc.), companies (for example, name, addresses, phone numbers), or other entities. The ability to split indices into one or many shards is a crucial feature that allows Elasticsearch to greatly outperform InfluxDB in the horizontal scaling, distribution, and parallelization of data. Elasticsearch is designed as a distributed system in which it is easy to add more instances to the cluster — it will move shards and replicate to new instances automatically in order to maximize the cluster’s availability.

Internally, Elasticsearch relies on Lucene’s implementation of inverted indices, which can be viewed as a map of terms and the documents in which these terms can be found. This is useful in that it can return a subset of documents containing terms specified in a search query.

Another data structure that significantly boosts performance, especially during aggregation queries, is the doc values structure. While inverted indices map terms to documents, doc values map the other way around, mapping documents to terms. Basically, a map consists of a list of documents and the terms that are contained in each document.

Similar to InfluxDB, Elasticsearch provides an HTTP REST API and Java API for communication and data manipulation. There are also language-specific libraries available, which are usually wrappers around the aforementioned APIs.

The storage of time series data begins with defining mappings. In Elasticsearch, this means telling the engine how it should store the data, and also the fields that we are going to send for indexing. Defining this in advance improves the subsequent performance when querying the data.

The timestamp added to your mappings should be one of the properties that you index. In previous versions, a  _timestamp field was added to every record inserted by default, but this practice was deprecated in newer versions of Elasticsearch.

Elasticsearch provides an aggregation framework that can be really useful during analysis, as it gives developers the option to perform aggregation over an entire set of documents, or time series data in our case. In contrast to InfluxDB, where you can only perform aggregation over numerical data, Elasticsearch can also handle textual data, which becomes very helpful when you have logs that contain messages, exceptions, and other text-based information.

In comparison with InfluxDB, Elasticsearch needs a little more configuration to run on large datasets. You need to define your mappings, which fields are indexed, what kind of data they contain (full text, numerical, etc), number of primary shards, and so on. Some of these settings — and others besides these — cannot be changed once documents are added to the index, so they need to be considered carefully before you start using it. The only way to remedy problems later is to actually create a new index with new settings, reindex all the documents, and then switch to the new index. Carrying this out on a live system is, of course, something you want to avoid.

Elasticsearch is usually used in combination with the stack’s other components — and Kibana (together called the or Elastic Stack) for log aggregation, analysis, and monitoring. Logstash is an open-source processing pipeline that gives you the ability to import logs from different sources (files, message queues, databases, etc.), transform them, and output them to a defined endpoint or data store, usually Elasticsearch. Kibana is the stack’s UI, providing users with a visual representation of the stored log files, and allowing them to easily configure customized dashboards, tables, and other aids to visualization.

Summing It Up

Storing a large amount of time series data can be a tough task, one that requires considerable effort and research on which storage engine to use. Both InfluxDB and Elasticsearch have their pros and cons, and there are no hard and fast rules on which is the right one for a particular use case.

Even though InfluxDB is a relatively young database, it seems that it is more specialized in time series data. has shown that it can handle a higher number of writes than Elasticsearch, and further development should allow it to easily become the leader in time series data storage.

Also, if you plan on logging textual data in your database — log messages, exceptions, requests, or responses, for example — and later querying them by content, Elasticsearch is a better solution since it specializes in textual data searching. Using InfluxDB for this kind of logging and later querying might require the use of an additional search engine, which can cause further problems in terms of synchronizing the data between the two systems. As a final consideration, it is worth pointing out that the aggregation framework in Elasticsearch is more extendable, and is not limited to just numerical and textual data.

Original Link

Provisioning ElasticSearch Clusters on AWS With Infrastructor

Infrastructor is a server provisioning and automation framework written in Groovy. This article describes how to automate the provisioning of a small ElasticSearch cluster on AWS with Infrastructor. 

Step 1: Install the Required Dependencies

To run the example below, you need to install a couple of dependencies:

  1. Oracle Java Virtual Machine 8

  2. Infrastructor (the latest version is 0.1.3)

Infrastructor comes with a CLI, which we will use later to run a provisioning script. It can be downloaded from the project release page. To install the Infrastructor CLI, unpack the ZIP and add the bin directory to the PATH environment variable. Check the installation by launching the CLI:

infrastructor version

Step 2: Prepare AWS Instances

Before we start, make sure you have an access to your private AWS account and create three t2.micro or t2.small instances. These instances will be used to deploy an ElasticSearch cluster. In the example below, I use instances based on Ubuntu 16.04. You can also use more powerful instances and you can increase the instance count if you want to. 

The easiest way to launch three EC2 instances might be to use the AWS web console, but you can also take a look at Terraform or Ansible to do so. Infrastructor also provides basic facilities to launch and manage EC2 instances, but this feature is still in beta and isn’t considered to be stable yet.

This is a set of basic requirements for the EC2 instances:

1. Instances have public IP addresses and accept SSH connections on port 22.

2. Instances are able to communicate with each over on TCP ports 9200 and 9300. You may also allow communication between your host and the instances on port 9200 so it will be possible to check cluster health status using a REST API without logging into one of the remote hosts by SSH.

3. In general, t2.micro instances should be good enough to run the example. However, I would recommend choosing t2.small if you can.

4. You also need to have a pair of AWS access keys (aws_access_key_id and aws_secret_access_key) with EC2 read permissions. Infrastructor will use them to retrieve instance information and build an inventory.

5. Give your instances the ‘elasticsearch:true’ tag so Infrastructor will update only them.

Step 3: Automate ElasticSearch Cluster Provisioning

Here is an Infrastructor provisioning script:

def AWS_ACCESS_KEY_ID = input message: 'AWS_ACCESS_KEY_ID: ', secret: true
def AWS_ACCESS_SECRET_KEY = input message: 'AWS_ACCESS_SECRET_KEY: ', secret: true // Describe AWS Inventory:
// 1. How to connect to AWS to retrieve a list of instances: awsAccessKeyId, awsAccessSecretKey and awsRegion
// 2. Filter instances by tags: tags
// 3. How to connect to an instance: username and keyfile
awsInventory { awsAccessKeyId = AWS_ACCESS_KEY_ID awsAccessSecretKey = AWS_ACCESS_SECRET_KEY awsRegion = "eu-central-1" tags = [elasticsearch: true] username = "ubuntu" keyfile = "path/to/your_private_keyfile"
}.provision { /** * Install docker first using the official repository * Do it on 3 nodes in parallel */ task name: "install docker", parallel: 3, actions: { shell sudo: true, command: """ apt-key adv --keyserver hkp:// --recv-keys 58118E89F3A912897C070ADBF76221572C52609D apt-add-repository 'deb ubuntu-xenial main' apt-get update apt-get install -y docker-engine usermod -aG docker ubuntu sysctl -w vm.max_map_count=262144 """ } /** * Run ElasticSearch containers on each host * Here 'node' is a variable which represents currently provisioning EC2 instance * 'nodes' variable is a set of all nodes in the inventory */ task name: "run elasticsearch nodes", actions: { shell """ docker rm -f $(docker ps -aq) || true docker run -d -p 9200:9200 -p 9300:9300 \ -e "${}" \ -e "node.master=true" \ -e "network.publish_host=${node.privateIp}" \ -e "${nodes*.privateIp.join(',')}" \ -e "ES_JAVA_OPTS=-Xmx512m -Xms512m" \ """ }

Save the script as provisioning.groovy. To run the script with the Infrastructor CLI, type:

infrastructor run -f provisioning.groovy

After some time, you should see a message that the execution has completed successfully. Then check the result by calling the  _cluster/health endpoint of ElasticSearch:

curl http://elastic:changeme@ES_NODE_PUBLIC_IP_HERE:9200/_cluster/health?pretty

You should see something like this:

{ "cluster_name" : "docker-cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 3, "number_of_data_nodes" : 3, "active_primary_shards" : 2, "active_shards" : 4, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0

This was a small demonstration of how to manage configurations and provision AWS nodes with Infrastructor. I hope you find it simple and neat! By the way, Infrastructor is an open source project and it is looking forward to your contributions, including pull requests, feature requests, and bug reports. Thank you for your interest in Infrastructor!


Original Link

What Is Elasticsearch and How Can It Be Useful?

Products that involve e-commerce and search engines with huge databases are facing issues such as product information retrieval taking too long. This leads to poor user experience and in turn turns off potential customers.

Lag in search is attributed to the relational database used for the product design, where the data is scattered among multiple tables — and the successful retrieval of meaningful user information requires fetching the data from these tables. The relational database works comparatively slow when it comes to huge data and fetching search results through database queries. Businesses nowadays are looking for alternatives where the data stored to promote quick retrieval. This can be achieved by adopting NoSQL rather than RDBMS for storing data. Elasticsearch (ES) is one such NoSQL distributed database. Elasticsearch relies on flexible data models to build and update visitor profiles to meet the demanding workloads and low latency required for real-time engagement.

Let’s understand what is so significant about Elasticsearch. ES is a document-oriented database designed to store, retrieve, and manage document-oriented or semi-structured data. When you use Elasticsearch, you store data in JSON document form. Then, you query them for retrieval. It is schema-less, using some defaults to index the data unless you provide mapping as per your needs. Elasticsearch uses Lucene StandardAnalyzer for indexing for automatic type guessing and for high precision.

Every feature of Elasticsearch is exposed as a REST API:

  1. Index API: Used to document the index.

  2. Get API: Used to retrieve the document.

  3. Search API: Used to submit your query and get a result.

  4. Put Mapping API: Used to override default choices and define the mapping.

Elasticsearch has its own query domain-specific language in which you specify the query in JSON format. You can also nest other queries based on your needs. Real-world projects require search on different fields by applying some conditions, different weights, recent documents, values of some predefined fields, and so on. All such complexity can be expressed through a single query. The query DSL is powerful and is designed to handle real-world query complexity through a single query. Elasticsearch APIs are directly related to Lucene and use the same name as Lucene operations. Query DSL also uses the Lucene TermQuery to execute it.

This figure shows how the Elasticsearch query works:Indexing and Searching in Elasticsearch

The Basic Concepts of Elasticsearch

Let’s take a look at the basic concepts of Elasticsearch: clusters, near real-time search, indexes, nodes, shards, mapping types, and more.


A cluster is a collection of one or more servers that together hold entire data and give federated indexing and search capabilities across all servers. For relational databases, the node is DB Instance. There can be N nodes with the same cluster name.

Near-Real-Time (NRT)

Elasticsearch is a near-real-time search platform. There is a slight from the time you index a document until the time it becomes searchable.


The index is a collection of documents that have similar characteristics. For example, we can have an index for customer data and another one for a product information. An index is identified by a unique name that refers to the index when performing indexing search, update, and delete operations. In a single cluster, we can define as many indexes as we want. Index = database schema in an RDBMS (relational database management system) — similar to a database or a schema. Consider it a set of tables with some logical grouping. In Elasticsearch terms: index = database; type = table; document = row.


A node is a single server that holds some data and participates on the cluster’s indexing and querying. A node can be configured to join a specific cluster by the particular cluster name. A single cluster can have as many nodes as we want. A node is simply one Elasticsearch instance. Consider this a running instance of MySQL. There is one MySQL instance running per machine on different a port, while in Elasticsearch, generally, one Elasticsearch instance runs per machine. Elasticsearch uses distributed computing, so having separate machines would help, as there would be more hardware resources.


A shard is a subset of documents of an index. An index can be divided into many shards.

Mapping Type

Mapping type = database table in an RDBMS.

Elasticsearch uses document definitions that act as tables. If you PUT (“index”) a document in Elasticsearch, you will notice that it automatically tries to determine the property types. This is like inserting a JSON blob in MySQL, and then MySQL determining the number of columns and column types as it creates the database table.

Do you want to know more about what Elasticsearch is and when to use it? Some of the use cases of Elasticsearch can be found here. Elasticsearch users have delightfully diverse use cases, ranging from appending tiny log-line documents to indexing web-scale collections of large documents and maximizing indexing throughput.

Sometimes, we have more than one way to index or query documents. And with the help of Elasticsearch, we can do it better. Elasticsearch is not new, though it is evolving rapidly. Still, the core product is consistent and can help achieve faster performance with search results for your search engine.

Original Link

SQL Solution to Elasticsearch

Developers today have a lot of storage options for building strategic new apps, including JSON databases like Elasticsearch. These new technologies allow development teams to more easily and efficiently iterate on features.

Using agile methodologies, teams work in sprints that last just a few weeks, getting new features to market fast. Compared to relational databases, Elasticsearch is far less demanding in terms of modeling and structuring data, and this is a big advantage in terms of development speed.

If the data from these strategic apps have value, eventually, it will be analyzed. The virtue of schema flexibility that is so compelling in Elasticsearch turns into a massive challenge when it comes to data analytics.

The basic data structure and APIs of Elasticsearch are fundamentally incompatible with most companies’ existing approach to data pipelines. And this lurking issue isn’t limited to Elasticsearch — it applies to third-party apps like and Workday, MongoDB, Cassandra, Hadoop, Amazon S3, Azure Blob Store, and more. Companies are putting massive amounts of essential data in systems that are incompatible with how they make sense of their data. It’s a big data problem.

Tomer Shiran, CEO and co-founder of Dremio, took on the challenge of using business intelligence on Elasticsearch and the role of self-service data to help unlock the value of data while preserving governance and security. He and his team build an SQL interface that does not compromise the data in Elasticsearch, nor does it limit the queries that can be made. With sophisticated query execution, people are able to use the full breadth of SQL.

This unlocks the islands of data that reside in Elasticsearch and other third-party apps so that BI, data analysts, and data scientists can access the data and build the reports they need while developers are free to develop apps to drive the business forward.

Original Link

How to Set Up MapR-DB to Elasticsearch Replication

The automatic replication of MapR-DB data to Elasticsearch is useful for many environments. There are some great uses cases I can think of for taking advantage of this great feature:

  1. Full-text search of data in MapR-DB
  2. Geospatial searches for location data (think mobile user data here)
  3. Kibana visualization of the data, especially useful for time series data like sensor data or performance/network metrics
  4. ES as a secondary index for a MapR-DB table (won’t be needed from MapR 6.0 when JSON DB tables will support secondary indices)
  5. Change data capture (arguably)

The MapR Gateway replication feature makes it possible to get the data into Elasticsearch 2.2 without any code!

Let’s learn how we can do it using the latest MapR Sandbox version 5.2.1 (available for free). There is no better way to learn than to do, after all!

By and large, the MapR documentation of this feature is sufficient for an experienced MapR admin to set up the replication working. However, the documentation isn’t task-focused. What I contribute in this post is a start to finish how to using sample data and cover the whole process step-by-step.


Some important notes about limitations of the MapR-DB to Elasticsearch replication:

  1. Only Elasticsearch 2.2 is supported
    • Later versions of ES will not work
  2. No support for JSON DB tables

Sample Dataset

For this tutorial, we’ll use pump sensor data that is used in other training materials and blogs such as Real-Time Streaming With Kafka and HBase by Carol MacDonald. I have modified it a bit to add an ID column.

[mapr@maprdemo ~]$ head -n 3 /mapr/

The data columns include a date, a time, and some metrics related to sensor readings from a pump such as those used in the oil industry (psi, flow, etc.). There are 47,899 rows in this dataset. While this is tiny by the standards of production MapR-DB, it’s more than enough to demonstrate the technology working on the sandbox.

Get the data here.

Send the data to the sandbox using the following command (while the sandbox is running, of course!):

$> scp -p 2222 sensordata.csv mapr@localhost:

You will be prompted for the password; it’s “mapr”.

Or else, you can wget the data directly from the sandbox. Just copy the dataset’s URL and paste it after wget directly while logged into the sandbox.

$> wget <URL to dataset>

Remember that to log into the sandbox from your favorite shell, just type:

$> ssh -p 2222 mapr@localhost

Finally, copy the data to MapR. This ensures the command to import data into MapR DB will run as-is:

$> cp sensordata.csv /mapr/

MapR-DB Replication Using the MapR Gateway Service

MapR-DB is a NoSQL database that follows in the footsteps of Google BigTable. More specifically, it started as a reimplementation of Apache HBase designed from the ground up to take advantage of the advanced inner workings of the killer distributed platform known as the MapR Converged Data Platform. It also now has native JSON support to more easily handle hierarchical, nested, and evolving data formats.

At its core, the MapR-DB replication feature was to enable a MapR-DB table to be replicated to a MapR-DB instance running on another cluster automatically. One primary use case is for a global enterprise to improve the speed of access and get multi-region level HA automatically with a guarantee of data consistency. This feature can get really fancy with bi-directional replication where applications can read and write to/from either replica and still know both are always kept up to date.

More info can be found here and here.

Setup Guide

Getting set up is pretty short.

Choices in Solution Design

If you just want to try this feature out, then the MapR Sandbox is a great way to get started quickly. I’ll make sure to cover that in this guide.

For those who may want to use this feature on a production cluster though, there are a couple of configurations to ponder:

  • Co-locate the ES cluster with the MapR cluster
  • Use an external ES cluster

Unsurprisingly, if you have plenty of hardware servers then the external ES cluster should be the preferred solution, to isolate services and reduce failure impact as well as reserve the cluster resources for actual big data processing.

While putting the ES cluster on separate nodes is the recommended solution for a production cluster, it is also possible to colocate part or all of an ES cluster with MapR nodes. Keep in mind that memory resources taken by ES are not available to the cluster.

For sizing of the ES cluster, the main factors are storage needs and incoming data throughput. The more data, the more nodes will be needed. The sizing issue is well explained in the MapR documentation.


Install ES (single node or cluster mode).

  1. Install Elasticsearch and run it (as root):
    $> wget
    $> rpm -i elasticsearch-2.2.0.rpm
    $> service elasticsearch start
  2. Check installation:
    $> curl localhost:9200
    { "name" : "D'Spayre", "cluster_name" : "elasticsearch", "version" : { "number" : "2.2.0", "build_hash" : "8ff36d139e16f8720f2947ef62c8167a888992fe", "build_timestamp" : "2016-01-27T13:32:39Z", "build_snapshot" : false, "lucene_version" : "5.4.1"
    }, "tagline" : "You Know, for Search"

    Note: It’s installed in /usr/share/elasticsearch/ and runs as user “elasticsearch” when installing with rpm.

  3. Update Elasticsearch config:
    $> vi /etc/elasticsearch/elasticsearch.yml = mapr-elastic = # <- ip of sandbox, hostname -i
  4. Verify config:
    $> curl maprdemo:9200
    { "name" : "Angelo Unuscione", "cluster_name" : "mapr-elastic", "version" : { "number" : "2.2.0", "build_hash" : "8ff36d139e16f8720f2947ef62c8167a888992fe", "build_timestamp" : "2016-01-27T13:32:39Z", "build_snapshot" : false, "lucene_version" : "5.4.1"
    }, "tagline" : "You Know, for Search"

Optional: Add port forwarding to access ES from your host. In VirtualBox, I added TCP port 9200 to the list of Port Forwarding Rules.


We’ll just keep in mind to remember the hostname of the ES instances and remember that the supported ES version is 2.2. This is important or else there is good chance the replication will fail.

Create a MapR-DB Table

There are a variety of ways to create a MapR-DB table. We’ll use the command line, but it’s equally possible (and very easy!) to use MCS to do it visually.

$> maprcli table create -path /user/mapr/pumps
$> maprcli table cf create -path /user/mapr/pumps -cfname data

That’s it! Inserting the data will create the columns automatically. We don’t need to worry about data types, as MapR-DB only stores bytes and it’s up to the application to convert the data to/from bytes. This is a common pattern for NoSQL databases.

Add Data to the MapR-DB Table

We’re using HBase’s inputTsv functionality to import the CSV formatted dataset directly into the MapR-DB table we just created. Again, no code required. So much for “Hadoop is difficult,” right?

$> hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=, -Dimporttsv.columns="HBASE_ROW_KEY,data:resid,data:date,data:time,data:hz,data:disp,data:flow,data:sedppm,data:psi,data:ch1ppm" /user/mapr/pumps /user/mapr/sensordata.csv

This launches a YARN MapReduce application to bulk import the data, meaning it will scale up to any size CSV from megabytes to petabytes. The main point to check here is that the data:  part means the column family. Adjust the columns to match your own use case and span potentially many column families — it will work just fine!

Note: The column names are meaningful. They must match up with the Elasticsearch type mappings we define later on. This is important to get everything working!

Install MapR Gateway Service

First, install the mapr-gateway package on one or more nodes. On a production cluster, it’s always recommended to have at least two gateways to enable high availability. The number of nodes running the gateway should be based on the network bandwidth requirement as well as cluster hardware and available resources.

To install the package, log in as root (su root after logging on as MapR, or just log in as root. The password is also :mapr”). Then, install the package using yum:

$> yum install -y mapr-gateway

After installing the package, still as ‘root’ configure the system again:

$> /opt/mapr/server/ -R
$> service mapr-warden restart

The details are all available on the MapR documentation site.

Register Elasticsearch

Next, we need to register ES with the MapR cluster. This basically means copying over some libraries for the gateway to use. An ES needs only be registered once per cluster and can be reused to replicate many tables to different index/types.

We will also need to run the following command as root.

To do this, run the script /opt/mapr/bin/register-elasticsearch.


  • -c: This parameter is a tag that will be used as a target for the replica setup command. the recommended name is the ES cluster name but it could be anything. It will be the name used for the replication command. remember it!
  • -t: Use the transport client. This is the only client supported by MapR 5.2 and is required in conjunction with the -r parameter.
  • -e: The directory where ES is installed. Note that if ES is installed via the RPM/Deb package, this parameter is not necessary.
  • -y: Do not prompt for values. If following the steps here, it’s safe to use.

Using the sandbox, this command will register ES as the MapR user:

$> /opt/mapr/bin/register-elasticsearch -c elastic -r maprdemo -t -y Copying ES files from maprdemo to /tmp/es_register_mapr...
The authenticity of host 'maprdemo (' can't be established.
RSA key fingerprint is 6a:24:76:81:7d:53:ab:4d:3e:b5:29:0a:cb:ab:dd:9a.
Are you sure you want to continue connecting (yes/no)? yes
Registering ES cluster elastic on local MapR cluster.
Your ES cluster elastic has been successfully registered on the local MapR cluster.

Doing this as root on a fresh sandbox, expect only the prompt for “Are you sure you want to continue connecting?” Answer yes, of course. If you run the command as user mapr, it will not work if Elasticsearch was installed from RPM because it requires access to the elasticsearch.yml file, which the RPM installs in the /etc/elasticsearch folder.

In practice, this will add some shared libs and other such required data to MapR FS under the folder /mapr/ You can verify the content of the Clusters subfolder will have “elastic”.

To verify ES is registered properly, you can then enter this command (notice the -l parameter):

$> /opt/mapr/bin/register-elasticsearch -l
Found 1 items
drwxr-xr-x - mapr mapr 3 2016-10-27 21:28 /opt/external/elasticsearch/clusters/elastic

We are now done with registering the Elasticsearch cluster with the MapR cluster. This only needs to be done once for each Elasticsearch cluster, regardless of how many tables replicate to ES.

Add Elasticsearch Mappings

This part is critical and a source of most issues. Get the mappings wrong and the replication will fail.

$> curl -X PUT maprdemo:9200/pumps/ -d '
{ "mappings" : { "pumpsdata" : { "properties" : { "pumpsdata" : { "dynamic" : "true", "properties" : { "resid" :{"type":"string"}, "date" :{"type":"date", "format":"MM/dd/yy"}, "time" :{"type":"string"}, "hz" :{"type":"double"}, "disp" :{"type":"double"}, "flow" :{"type":"double"}, "sedppm" :{"type":"double"}, "psi" :{"type":"double"}, "ch1ppm" :{"type":"double"} } } } } }

The mappings are really critical because MapR DB binary tables, just like HBase and many NoSQL databases, have no information about the data. They only store bytes. As such, the replication gateway needs to convert the data from bytes into whatever is in the mapping for the type defined in Elasticsearch. If a conversion fails, it throws an exception and the replication fails.

Check this page in the MapR documentation to validate that your mapping is indeed correct given your data.

Personally, the date/timestamp types caused me a lot of grief. It was fiddly to get it working properly until I got the hang of it. The mappings above are tested and work.

Set Up Replication

We are finally there! Time to start the actual replication. Related documentation is found here in MapRDocs.

This is done using the maprcli utility as user “mapr”:

$> maprcli table replica elasticsearch autosetup -path /user/mapr/pumps -target elastic -index pumps -type pumpsdata

Once this command is run, MapR will launch a MapReduce job to do an initial bulk replication of the data currently stored in the MapR-DB table. This could be long if the table is already holding a lot of data. With our very small test data (47,899 rows) this should take less than one minute, mostly because of the startup cost of a MapReduce job. You can see it running by opening the ResourceManager UI (http://maprdemo:8088/cluster/apps).

If planning to use replication from the start, it’s probably a good idea to set it up when the table has just a bit of data to make the initial bulk load run quickly. While it’s possible to enable replication on an empty table, I wouldn’t recommend it since there is no way to make sure the replication is setup properly until data is added, which could be in production. I tend to prefer to detect errors and fix issues as early as possible.

From there on out, as data is added to the MapR-DB table, the data will be automatically replicated to ES by the gateway. It’s magic!

Verifying the Replication

In MCS, we should now be able to see that the replication has indeed been successful.

In Elasticsearch, we can also make sure that we have three hits for the rows we have replicated so far:

$> curl maprdemo:9200/pumps/metrics/_count


Above, we can see a screenshot of MCS where the /user/mapr/pumps table’s replicas tab is, which clearly shows that Elasticsearch replication is on.

The first load is a bulk load, and all subsequent inserts/updates are added to ES as they are added to MapR-DB in a streaming fashion.

Potential Issues

Some sources of issues to be careful about:

  1. Make sure the user running the replication command has POSIX permissions to the MapR-DB table. In our case, we’re creating it with user ‘mapr’ and running the command as ‘mapr,’ so that’s OK. Permissions in MapR matter.
  2. Double check that your index is created and the mappings are well matched to the data. If you’re using our test data and mappings though, it should be smooth sailing!
  3. Finally, ensure that the data input are strings in UTF-8 format in this particular example. The gateway decodes the bytes stored in MapR-DB as a UTF-8 string so if the data input was ASCII, the decoded output will be weird numbers and ES will complain. UTF-8 is the default file format of all modern computers, so it should be fine, but it’s something to keep in mind.

If the job fails, go to elasticsearch-2.2.0/conf and edit the logging.yml file to set the logging level to DEBUG. Tailing the log in elasticsearch-2.2.0/logs/elastic.log will give the most information about conversion errors.

Wrap Up

Replication to Elasticsearch can be a very useful feature, with a lot of great use cases as I described above. It’s pretty easy to set up and will work reliably in the background to keep your data synchronized. Hopefully it will encourage more MapR users to experiment with this feature and take advantage of it on their production clusters.

Additional Resources

Original Link