Learning Elasticsearch

Featured

 

 

 

 

 

 

 

 

 

Packt: https://www.packtpub.com/big-data-and-business-intelligence/learning-elasticsearch

Amazon: https://www.amazon.com/Learning-Elasticsearch-Abhishek-Andhavarapu-ebook/dp/B01MURNWEB

This book will show you how to build highly scalable search applications using Elasticsearch. You can use Elasticsearch for a small application or a large application with billions of documents. It is built to scale horizontally and can handle both structured and unstructured data. You will install and setup Elasticsearch and Kibana, and handle documents using the Distributed Document Store. You will see how to query, search, and index your data, and perform aggregation-based analytics with ease. You will see how to use Kibana to explore and visualize your data. Furthermore, you will also learn to handle document relationships, work with geospatial data, and much more, with this easy-to- follow guide. Finally, you will see how you can set up and scale your Elasticsearch clusters in production environments.

Packed with easy-to- follow examples, this book will ensure you will have a firm understanding of the basics of Elasticsearch and know how to utilize its capabilities efficiently.

This book will show you how to build highly scalable search applications using Elasticsearch. You can use Elasticsearch for a small application or a large application with billions of documents. It is built to scale horizontally and can handle both structured and unstructured data. You will install and setup Elasticsearch and Kibana, and handle documents using the Distributed Document Store. You will see how to query, search, and index your data, and perform aggregation-based analytics with ease. You will see how to use Kibana to explore and visualize your data. Furthermore, you will also learn to handle document relationships, work with geospatial data, and much more, with this easy-to- follow guide. Finally, you will see how you can set up and scale your Elasticsearch clusters in production environments.

Packed with easy-to- follow examples, this book will ensure you will have a firm understanding of the basics of Elasticsearch and know how to utilize its capabilities efficiently.

Beginners guide to querying Elasticsearch (Scoring vs Sorting)

Featured

Tags

, , , , , ,

Background

Elasticsearch is an open source highly scalable search and analytics engine. The Search API in Elasticsearch is very flexible and can easily scale to petabytes of data. We will discuss how easy it is to query Elasticsearch and introduce the concept of relevance. We will cover the following here:

  • Different types of queries
  • Querying Elasticsearch
  • Relevance

Objective

In this article, Abhishek Andhavarapu, the author of the book Learning Elasticsearch shows how to query Elasticsearch. You will learn how text search is different and the difference between sorting and scoring. The query language is very expressive and can be used to define filters, queries, sorting, pagination, and aggregations in the same query.

Different types of queries

Elasticsearch queries are executed using the Search API. Like anything else in Elasticsearch, request and response are represented in JSON.

Queries in Elasticsearch at a high level are divided as follows:

  • Structured queries: Structured queries are used to query numbers, dates, statuses, and so on. These are similar to queries supported by a SQL database. For example, whether a number or date falls within a range or to find all the employees with John as the first name and so on
  • Full-text search queries: Full-text search queries are used to search text fields. When you send a full-text query to Elasticsearch, it first finds all the documents that match the query, and then the documents are ranked based on how relevant each document is to the query. We will discuss relevance in detail in the Relevance section

Both structured and full-text search queries can be combined while querying. In the next section, we will describe the overall structure of request and response.

Querying Elasticsearch

One of most powerful features of Elasticsearch is the Query DSL (Domain specific Language) or the query language. If you are familiar with SQL language, the following table shows the equivalent terms in Elasticsearch:

Database Table Row Column
Index Type Document Field

To execute a search query, an HTTP request should be sent to the _search endpoint. The index and type on which the query should be executed is specified in the URL. Index and type are optional. If no index/type is specified, Elasticsearch executes the request across all the indexes in the cluster. A search query in Elasticsearch can be executed in two different ways:

  • By passing the search request as query parameters.
  • By passing the search request in the request body.

A simple search query using query parameters is shown here:

GET http://127.0.0.1:9200/chapter6/product/_search?q=product_name:jacket

Simple queries like in the above example can be executed using the URL request parameters. Anything other than a simple query should be passed as the request body.  The preceding query, when passed as a request body, looks like the following:

POST http://127.0.0.1:9200/chapter6/product/_search 
{
   "query": {
     "term": {
       "product_name" : "jacket"
     }
   }
 }

The preceding query is executed on the chapter6 index and type named product. The query can also be executed on multiple indexes/types at the same time, as shown here:

POST http://127.0.0.1:9200/chapter5,chapter6/product,product_reviews/_search 
{
   "query": {
     "term": {
       "product_name" : "jacket"
     }
   }
 }

The HTTP verb we used in the preceding example for the _search API is POST. You can also use GET instead of POST. Since most browsers will not support a request body when using GET, we used POST.

The basic structure of the request body is shown here:

{ 
   "size" : //The number of results in the response. Defaults to 10.
 
   "from" : // The offset of the results. For example, to get the third page for a page size of 20; you should set the size to 20 and from to 40.
 
   "timeout" : // A timeout can be specified after which the partial results are sent back in the response. By default there is no timeout. If the request times out, the timed_out value in the response will be indicated as true. 
 
   "_source" : //To select the fields, that should be included in the response. For example : "_source" : ["product_name", "description"].
  
   "query" : {
      // Query 
   }
 
   "aggs" : {
      // Aggregations
   }
 
   "sort" : {
      // How to sort the results
    }
 }

The structure of the response body is shown here:

{
  "took": // Time Elasticsearch took to execute the query. 
 
  "timed_out": // Did the query time out. By default, there is no timeout.
 
 // Elasticsearch doesn't fail the request if some shards don't respond or not available. The response will contain partial results.
 
  "_shards": { 
    "total": // Number of shards the query needs to be executed.
    "successful": // Number of shards the query is successful on.
    "failed": // Number of shards the query failed.
  },
  
  "hits": {
   "total": // Total number of hits
   "max_score": // Maximum score of all the documents
   "hits": [
      // Actual documents.
    ]
   }
 }

Sample data

To better explain the various concepts in this article, we will use the e-commerce site as an example. We will create an index with a list of products. This will be a very simple index called chapter6 with type called product. The mapping for the product type is shown here:

#Delete existing index if any
DELETE http://127.0.0.1:9200/chapter6
 
#Mapping
PUT http://127.0.0.1:9200/chapter6
{
  "settings": {},
  "mappings": {
    "product": {
      "properties": {
        "product_name": {
          "type": "text",
          "analyzer": "english"
        },
        "description": {
          "type": "text",
          "analyzer": "english"
        }
      }
    }
  }
}

Let’s index some product documents:

#Index Documents
PUT http://127.0.0.1:9200/chapter6/product/1
{
  "product_name": "Men's High Performance Fleece Jacket",
  "description": "Best Value. All season fleece jacket",
  "unit_price": 79.99,
  "reviews": 250,
  "release_date": "2016-08-16"
} 

PUT http://127.0.0.1:9200/chapter6/product/2
{
  "product_name": "Men's Water Resistant Jacket",
  "description": "Provides comfort during biking and hiking",
  "unit_price": 69.99,
  "reviews": 5,
  "release_date": "2017-03-02"
} 

PUT http://127.0.0.1:9200/chapter6/product/3
{
  "product_name": "Women's wool Jacket",
  "description": "Helps you stay warm in winter",
  "unit_price": 59.99,
  "reviews": 10,
  "release_date": "2016-12-15"
}

We will refer to the preceding three documents for the remainder of this article.

Basic query (finding the exact value)

The basic query in Elasticsearch is term query. It is very simple and can be used to query numbers, boolean, dates, and text. Term query is used to look up a single term in the inverted index.

A simple term query looks like the following:

POST http://127.0.0.1:9200/chapter6/product/_search 
{
   "query": {
     "term": {
       "product_name" : "jacket"
     }
   }
 }

Term query works great for a single term. To query more than one term, we have to use terms query. It is similar to in clause in a relational database. If the document matches any one of the terms, it’s a match. For example, we want to find all the documents that contain jacket or fleece in the product name. The query will look like the following:

POST http://127.0.0.1:9200/chapter6/_search 
{
   "query": {
     "terms": {
       "product_name" : ["jacket","fleece"]
     }
   }
 }

The response of the query is as follows:

{
   ....
   "hits": {
     "total": 3,
     "max_score": 0.2876821,
     "hits": [
       {
         "_index": "chapter6",
         "_type": "product",
         "_id": "2",
         "_score": 0.2876821,
         "_source": {
           "product_name": "Men's Water Resistant Jacket",
           "description": "Provides comfort during biking and hiking",
           "unit_price": 69.99,
           "reviews": 5,
           "release_date": "2017-03-02"
         }
       },
       {
         "_index": "chapter6",
         "_type": "product",
         "_id": "1",
         "_score": 0.2824934,
         "_source": {
           "product_name": "Men's High Performance Fleece Jacket",
           "description": "Best Value. All season fleece jacket",
           "unit_price": 79.99,
           "reviews": 250,
           "release_date": "2016-08-16"
         }
       },
       {
         "_index": "chapter6",
         "_type": "product",
         "_id": "3",
         "_score": 0.25316024,
         "_source": {
           "product_name": "Women's wool Jacket",
           "description": "Helps you stay warm in winter",
           "unit_price": 59.99,
           "reviews": 10,
           "release_date": "2016-12-15"
         }
       }
     ]
   }
 }

Relevance

A traditional database usually contains structured data. A query on a database limits the data depending on different conditions specified by the user. Each condition in the query is evaluated as true/false, and the rows that don’t satisfy the conditions are eliminated. However, full-text search is much more complicated. The data is unstructured, or at least the queries are. We often need to search for the same text across one or more fields. The documents can be quite large, and the query word might appear multiple times in the same document and across several documents. Displaying all the results of the search will not help as there could be hundreds, if not more, and most documents might not even be relevant to the search.

To solve this problem, all the documents that match the query are assigned a score. The score is assigned based on how relevant each document is to the query. The results are then ranked based on the relevance score. The results on top are most likely what the user is looking for. In the next few sections, we will discuss how the relevance is calculated and how to tune the relevance score.

Let’s query the chapter6 index we created at the beginning of this article. We will use a simple term query to find jackets. The query is shown here:

POST http://127.0.0.1:9200/chapter6/_search
 {
   "query": {
     "term": {
       "product_name" : "jacket"
     }
   }
 }

The response of the query looks like the following:

{
   ....
   "hits": {
     "total": 3,
     "max_score": 0.2876821,
     "hits": [
       {
         "_index": "chapter6",
         "_type": "product",
         "_id": "2",
         "_score": 0.2876821,
         "_source": {
           "product_name": "Men's Water Resistant Jacket",
           "description": "Provides comfort during biking and hiking",
           "unit_price": 69.99,
           "reviews": 5,
           "release_date": "2017-03-02"
         }
       },
       {
         "_index": "chapter6",
         "_type": "product",
         "_id": "1",
         "_score": 0.2824934,
         "_source": {
           "product_name": "Men's High Performance Fleece Jacket",
           "description": "Best Value. All season fleece jacket",
           "unit_price": 79.99,
           "reviews": 250,
           "release_date": "2016-08-16"
         }
       },
       {
         "_index": "chapter6",
         "_type": "product",
         "_id": "3",
         "_score": 0.25316024,
         "_source": {
           "product_name": "Women's wool Jacket",
           "description": "Helps you stay warm in winter",
           "unit_price": 59.99,
           "reviews": 10,
           "release_date": "2016-12-15"
         }
       }
     ]
   }
 }

From the preceding response, we can see that each document contains a _score value. The scores of the three jackets are as follows:

ID Product name Score
2 Men’s water-resistant jacket 0.2876821
1 Men’s high-performance fleece jacket 0.2824934
3 Women’s wool jacket 0.25316024

We can see that the document with the ID 2 is scored slightly higher than documents 1 and 3. The score is calculated using the BM25 similarity algorithm. By default, the results are sorted using the _score values.

At a very high level, BM25 calculates the score based on the following:

  • How frequently the term appears in the document–term frequency (tf)
  • How common is the term across all the documents–inverse document frequency (idf)
  • Documents which contains all or most of the query terms are scored higher than the document that don’t
  • The normalization is based on the document length, shorter documents are scored better than the longer ones

To learn more about how the BM25 similarity algorithm works, please visit https://en.wikipedia.org/wiki/Okapi_BM25.

Not every query needs relevance. You can search for the documents that exactly match a value, such as status, or search for the documents within a given range. Elasticsearch allows combining both structured and full-text search in the same query. An Elasticsearch query can be executed in a query context or a filter context. In the query context, a relevance _score is calculated for each document matching the query. In a filter context, all the results that match the query are returned with a default relevancy score of 1.0; we will discuss more details in the next section.

Queries versus Filters

By default, when a query is executed, the relevance score is calculated for each result. When running a structured query (such as age equal to 50) or a term query on a non-analyzed field (such as gender equal to male), we do not need scoring. As these queries are simply answering yes/no. Calculating the relevance score for each result can be an expensive operation. By running a query in the filter context, we are telling Elasticsearch not to score the results.

The relevance score calculated for a query only applies to the current query context and cannot be reused. Like we discussed in the preceding section, score is based on term and inverted document frequency (idf), due to which the queries are not cachable. On the other hand, filters have no relevance to the query and can be cached automatically. To run a query in the filter context, we have to wrap the query with a constant_score query as shown here:

POST http://127.0.0.1:9200/chapter6/_search
 {
   "query": {
     "constant_score": {
       "filter": {
         "term" : {
           "product_name" : "wool"
         }
       }
     }
   }
 }

The results of the preceding query are not scored, and all the documents will have a score of 1. The query runs in the filter context and can be cached. We can also run queries that need scoring in the query context and others in the filter context. We will use the bool query to combine various queries as shown in the following example. A sample query is shown here:

POST http://127.0.0.1:9200/chapter6/_search
 {
   "query": {
     "bool": {
       "must": [
         {
           "match": { #Query context
             "product_name": "jacket"
           }
         },
         {
           "constant_score": { #Filter context
             "filter": {
               "range": {
                 "unit_price": {
                   "lt": "100"
                 }
               }
             }
           }
         }
       ]
     }
   }
 }

In the preceding query, the match query is executed in the query context, and the range query is executed in the filter context.

How we optimized 100 sec elasticsearch queries to be under a sub second.

Featured

Tags

, , ,

In a SQL world, query optimizers are well matured and understood, distributed systems on other hand are new and not very mature. Understanding how the queries work is very important.

Some of our Elasticsearch queries started taking more than 100 seconds causing various timeouts, garbage collections, cache evictions etc. This blog post lists various interesting things we found out in our analysis and how we optimized most of our queries to under a sub second and lessons learnt along the way.We used Marvel to dig in to the elasticsearch metrics to pinpoint the root cause.

At the time of slowness, we were seeing about 150K requests per second.

Search Requests

 

 

 

 

 

Number of search requests itself is not a problem. The graph below shows the size of the filter cache. We currently have 11 Elasticsearch nodes in production with 30GB of memory for each instance for a total of 330GB of memory for the entire cluster. Of that 330GB of memory about 40% of it is dedicated to the filter cache. When we query an index the results of each request is cached here and can be reused for future requests greatly improving query performance.

filterCache

 

 

 

 

 

This memory space is shared by all customers in production and is managed by a least recently used schedule. When we fill up the filter cache space the least recently used filters are evicted.

JVM usage before optimization

We can see from the above graph the thrashing of memory space where we are constantly building up and evicting filters over and over again for a continuous time period. This led to some very long garbage collections.

gcUsage All the evictions are causing lots of old generation garbage collection cycles. The old GC’s is that they can have a stop the world phase. That means that for the duration of the garbage collection that Elasticsearch node is dead to the rest of the cluster. It will not respond to requests or cluster pings. Ideally Old GC’s should be rare and short lived. What we are seeing here are frequent and long lived GC’s for many nodes.

The long lived garbage collection cycles that took over 5 seconds each. The node could stop responding five seconds a time four times over the course of 2 mins requests. Which is the root cause for our 100sec query times.

Our CPU usage or disk i/o on the other hand is not stressed. We are mostly memory bound. Nodes in our cluster have 64GB of memory, 30GB of which is allocated to elasticsearch JVM and remaining is allocated to file cache. We have seen that due to the file cache and SSD’s, 132GB of filter cache is being filled up in less than 4 seconds causing out of memory exceptions which it turn crashing the cluster.

1 : UPGRADE MEMORY 

Upgrading hardware is not always a solution, but in our case since we are mostly memory bound we could just double the ram on the nodes. But its not recommended to allocate more than 32GB for JVM heap. So we decided to double our RAM and run two instances of Elasticsearch on each node, thanks to Elasticsearch rack awareness both primary and replica shards doesn’t live on the same box. Upgrading the ram bought us time to figure out what’s going on.

2 : CONTROL WHAT’S BEING CACHED

Our first intuition is look at what we cache. When we looked at our queries we realized that we cache almost everything which is a lot, when we have thousands of queries per second. For example one of our queries look something like this

query

We tried caching only some filters while avoiding others, which did help but as much as we expected.

3 : CHANGE THE QUERIES

At the time of slowness, we had about 64 billion documents in the cluster. Requests are executed in a map reduce fashion. Requests from the client are load balanced and distributed across all the nodes in the cluster. The node receiving the request sends the request to all the nodes containing the shards. Each shard caches the filter using bit sets.

queryexecution

Since the filters are cached on the first requests, all the subsequents requests should be served from memory. But the problem, the request can be routed to both primary and replica and due to large number of requests, the filter cache is continuously teared up and rebuilt. In effort to decrease the number of queries we started looking in to aggregations.

4 : AGGREGATIONS 

aggregations

 

 

 

 

 

 

 

There are lots of aggregations supported by Elasticsearch. For example, terms aggregation query shown above is very similar SQL group by. But in Elasticsearch the data is distributed among multiple shards. So when an aggregation query is executed, the request is sent to all the shards and shards reply with their view of data. The node collecting the result does the aggregation and sends them back to the client. Due to the distributed nature there are not always accurate. Elasticsearch has some documentation on how this works

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html

Once the data is bucketed by a key, Elasticsearch support nested aggregations like

nestedAgg

 

 

 

 

 

 

 

 

 

In the above query, the documents are first grouped by their gender and their average height for each bucket is being calculated. All of this can be done using just one query and its executed parallel across several nodes in your cluster. Causing the number of queries go down and in turn decreasing the stress on memory.

But to use aggregations all the field data has to be loaded in to memory. Elasticsearch stores the field data in to field cache, which is by default 10% of JVM heap. Which raises more concerns as it has to load all the field data in to memory mostly likely causing out of memory exceptions since we cannot predict the amount of memory that we would need. Which might lead to more stress on memory, leading to more old generation garbage collections leading to slow query times and even the risk of crashing the cluster.

To avoid this, Elasticsearch has circuit breakers to safe guard the amount of memory being used by the field cache. Which can also be set at a request level, when a request is consuming more memory than a certain level, the requested is terminated. And we are trading more CPU for RAM. By default, field data is lazy loaded, but there are various workarounds in the Elasticsearch documentation. There are also plans to use file system cache for field data. File System Cache is managed by the OS and is much more efficient and no garbage collections. Distributed systems are slowly moving away from using JVM heap. Apache Kafka now completely depends on file system cache.

http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/preload-fielddata.html

Also sometimes, aggregations can be 10X slower than regular queries. For example, when using the terms aggregation query  if the number of keys are less than 25, querying each key individually is much faster than an aggregate query. Bottom line, JVM heap can be a blessing and a curse. Memory is the one of reasons why elasticsearch is so fast but if not careful it might drastically affect the performance. Doesn’t necessarily mean to not use memory but to decrease the stress of JVM heap and use file system cache or similar.

If you have any questions, please leave a comment below or reach me out at abhishek376@gmail.com / @abhishek376

Real time analytics using Hadoop and Elasticsearch (Big Mountain Data 2014)

Featured

Tags

, , ,

I recently spoke at Big mountain data 2014 conference. Here are the slides and steps for the demo.

Link to the slides : http://www.slideshare.net/abhishek376/real-time-analytics-using-hadoop-and-elasticsearch

Hadoop 2.4.0
Hive 0.13.0
Elasticsearch 1.3.4 (http://www.elasticsearch.org/download/)
HDP 2.1 (http://hortonworks.com/products/hortonworks-sandbox/)

HDP doesn’t come with elasticsearch installed.

INSTALLING ELASTICSEARCH :
wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.4.noarch.rpm
rpm -Uvh elasticsearch-1.3.4.noarch.rpm
chkconfig –add elasticsearch
service elasticsearch start
cd /usr/share/elasticsearch/

INSTALLING HEAD AND DESK PLUGINS:
bin/plugin –install mobz/elasticsearch-head
bin/plugin –install lukas-vlcek/bigdesk

INSERTING DATA IN TO MYSQL:
drop database bigmountaindata;
create database bigmountaindata;

use bigmountaindata;
create table gender(id int, height int, gender varchar(50));
insert into gender values(1,6,’Male’);
insert into gender values(2,5,’Male’);
insert into gender values(3,6,’Female’);

select * from gender;

USING SQOOP TO IMPORT DATA IN TO HIVE:
Hive:
create database bigmountaindata;

SQOOP :
sqoop import –connect ‘jdbc:mysql://127.0.0.1:3306/bigmountaindata’ –username=root –table gender -m 1 –class-name bigmountaindata.gender –outdir /tmp/src/generated –target-dir /tmp/sqoop-import/bigmountaindata.gender –hive-import –hive-drop-import-delims –hive-table bigmountaindata.gender

USING HIVE EXTERNAL TABLES TO PUSH DATA IN TO ELASTICSEARCH:
add jar /usr/lib/hive/lib/aux/elasticsearch-hadoop-hive-2.1.0.jar;
use bigmountaindata;
CREATE EXTERNAL TABLE if not exists es_gender (
id INT,
height INT,
gender STRING)
STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
TBLPROPERTIES(‘es.resource’ = ‘bigmountaindata/gender’);
insert overwrite table es_gender select id, height, gender from the;

ELASTICSEARCH :

Number of male and Female
http://hortonworks:9200/bigmountaindata/gender/_search?search_type=count
{
“aggs”: {
“genders”: {
“terms”: {
“field”: “gender”
}
}
}
}

Average height of males and females
http://hortonworks:9200/bigmountaindata/gender/_search?search_type=count
{
“aggs”: {
“genders”: {
“terms”: {
“field”: “gender”
},
“aggs”: {
“avg_height”: {
“avg”: {
“field”: “height”
}
}
}
}
}
}

Feel free to reach me if you have any question @abhishek376/ abhishek376@gmail.com

Elasticsearch – How to change the mappings without reindexing

Featured

Tags

, , ,

We have a reporting platform based on Elasticsearch. I was often asked to add fields or modify a mapping of a field. But once the data is indexed in to ES it cannot be modified. So I developed a utility tool which can modify mapping of an Elasticsearch index. The tool also can backup and restore an index. The tool was designed for small indexes never intended for larger indexes.

More Info

 http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/

GitHub

https://github.com/abhishek376/ESUtlilty/

How it works

1) The utility uses the Search (Scan) API to get 100 documents at once. Make sure the source is enabled.

2) Changes the mapping. Adds/Removes/Delete the fields.

3) Push the documents back to Elasticsearch using bulk API.

If you have dynamic mapping set to strict. Make sure you set the mapping on the new index before you run the tool. The tool doesn’t modify the source index.

Credits

Allegiance Inc (Company I work) for agreeing to open source this tool.

Building the tool

Built using Maven. You can specify the Elasticsearch version in the pom file.

mvn clean package

Usage

java -jar ESUtility.jar -changeMapping -clusterName ESVM -esHost es1 -field newfield -mappingType type -newFieldType string -newIndex newindex -oldIndex oldindex

Options

Changing an existing Elasticsearch index.

  1. Changes Mapping of an existing Elasticsearch index.
  2. Add new fields to existing Elasticsearch index. (Add the new mapping to the index and run the tool like changeMapping)
  3. Removes a field from exisiting Elasticsearch index.

Help : java -jar ESUtility.jar -changeMapping will give you all the available properties

 -changeMapping        Change Mapping of an index
 -clusterName <arg>    Elasticsearch cluster name
 -esHost <arg>         Elasticsearch host name
 -field <arg>          Field to change the mapping for
 -mappingType <arg>    Mapping Type
 -newFieldType <arg>   New field type
 -newIndex <arg>       Index to restore to
 -oldIndex <arg>       Original Index
 -removeField <arg>    Remove a field (Optional)

Backup a index in to a file

The utility reads the entire index in to memory and writes to a file. Beware of the memory. In the next version I will try to write this to multiple files. Help : java -jar ESUtility.jar -backup will give you all the available properties

 -backup              Backup index to a file
 -clusterName <arg>   Elasticsearch cluster name
 -esHost <arg>        Elasticsearch host name
 -index <arg>         Index to back up
 -mappingType <arg>   Mapping Type

Restore to the index from file.

Reads from a file and restores to Elasticsearch index. Help : java -jar ESUtility.jar -restore will give you all the available properties

 -restore             Restore index from a file
 -clusterName <arg>   Elasticsearch cluster name
 -esHost <arg>        Elasticsearch host name
 -file <arg>          File to restore backup from
 -index <arg>         Index to back up
 -mappingType <arg>   Mapping Type

You can write to me @abhishek376 or abhishek376@gmail.com if you have any questions.

Elasticsearch – Sorting and paging nested documents

Featured

Tags

, ,

We use elasticsearch to support our reporting backend. One of the primary requirements are paging and sorting the results. Paging the documents is very easy and works out of box. But sorting nested documents can become tricky. I was tasked to sort the documents with properties not in elasticsearch. As elasticsearch is a key value pair store. The documents are keyed by the id and value is nesed document.

Document structure. Lets suppose we have two documents. Given below are two documents and their properties represented in nested documents. We have to sort the two documents using their properties.

{ id : 123,
  data : {
   {dataid: 1, value: 2},
   {dataid: 2, value: 3}
  }
}
{ id : 124,
  data : {
    {dataid: 1, value: 4},
    {dataid: 2, value: 5}
   }
 }

Say suppose we have a properties table

 dataid datatext property1 property2
   1      "One"     2.5     2.6
   2      "Two"     3.4     5.0

If we just want to sort the results using dataid it should be pretty straight forward. For more information please refer Elasticsearch Guide

{
 "from": 0,
 "size": 50,
 "sort": [
 {
   "data.datavalue": {
       "order": "asc",
       "nested_filter": {
           "term": {
              "data.dataid": "1"
            }
         }
       }
     }
  ],
 "filter": {
     }
 }

But if you have to sort by something other than the dataid for example datatext or property1. We could use custom score to sort. The script tag can generated dynamically. For example to sort on property1 then dataid 2 should before dataid 1. For paging, just specify from and size in the query. The query would something like this.

{
 "from": 0,
 "size": 50,
 "sort": [
   {
      "_score": {
        "order": "asc"
      }
    }
 ],
"query": {
   "nested": {
      "path": "data",
      "query": {
      "custom_score": {
          "query": {
             "bool": {
              "should": [
                  {
                     "term": {
                        "data.dataid": 1
                       }
                  },
                  {
                    "term": {
                       "data.dataid": 2
                      }
                  }
              ]
          }
       },
        "script": "if(doc['data.dataid'].value=='1') { return 2 }
         else if(doc['data.dataid'].value=='2') { return 1 } else { return 0 }"
       }
     }
   }
 }

Similarly if we had to sort using the datatext we could generate the script tag dynamically and swap the script tag with the necessary sort. This way we need not store all the properties into the every document. We can just generate the script tag dynamically when needed.

Feel free to contact me @abhishek376 or leave a comment below.

Read and write data to/from Hive to Elasticsearch

Featured

Tags

, , ,

Elasticsearch

Recently we started working on Elasticsearch. Its pretty amazing at what it does. We use Elasticsearch for real-time analytics. Their website describes the Elasticsearch as

flexible and powerful open source, distributed real-time
search and analytics engine for the cloud

Before ES, We evaluated at-least dozen nosql systems like MongoDB, Riak etc.  but nothing is closer to Elasticsearch. We are a analytics company we run all kinds of statistical analysis of data. Elasticsearch provides a REST API. We developed a .NET wrapper with thrift to talk to the ES REST API. If you are looking for a .NET API for Elasticsearch drop me a line I’m more than happy to share the code.

For a brief background, our system is backed up by a MS-SQL database. We sqoop data from SQL to Hadoop and we mash the data using Hive and write data to Elasticsearch.

To read/write the data from Hive to Elasticsearch there are a couple of ways :

1) Elasticsearch has something called “River” (http://www.elasticsearch.org/guide/reference/river/)  River is a service which will run on one of the nodes on the cluster which can read data from an external source or write data. You can do pretty anything with it. You can span a thread which wakes up every one hour to do some work. So I could write a hive river which checks new data every one hour using the Hive Java API and index the new data in elasticsearch but the problem is if we have a big pay load you can make the node inaccessible for a while.  If the river running node fails for the some reason, Elasticsearch starts the river on some other node in the cluster. But due to load created on the node running the river, I was hesitant to do this.

2) Hive provides StorageHandlers which allows you to program custom storage handlers to read and write data from hive. To implement a HiveStrorageHandler one has to implement input, output format and SerDe interfaces of Hadoop. To use the storage handlers one has to create an external table specifying the storage handler for example

CREATE EXTERNAL TABLE artists (
    id      BIGINT, // _id
    name    STRING
   STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists/',
              'es.id.field' = 'id'); // Overrides ES _id with the id used when deleting the documents. Make sure id is unique.

While creating the external table you can also specify various other properties. For example like we can specify  es index to index the data in to. Luckily Elasticsearch team has developed a HiveStorageHandler for Hadoop (https://github.com/elasticsearch/elasticsearch-hadoop) which had some bugs in the code. I reached the author @costinl and he fixed the bugs and patched the main immediately. Thanks @costinl.

How Hive storage handler works :

When data is inserted in to hive external table the map/reduce jobs call the ESOutputFormat class which calls ESRecordWriter to write the records. ESRecordWrite calls the Elasticsearch REST API to index the data. Our requirements are not only to insert the data but also to delete the data from Elasticsearch. But the elasticsearch storage handler only supports indexing the data.  I have to change the storage handler to support deletes. To delete data from elasticsearch I used the Bulk API (http://www.elasticsearch.org/guide/reference/api/bulk/) the same API used to index data in to elasticsearch.

I added a property called es.operation.type in the ConfigurationOptions class which can be specified using TBLPROPERTIES when creating the external table. The default is index and you can also specify delete. Which tells the ESRecordWriter to use the Bulk API to delete the data instead of indexing data. The modified  StorageHandler with the delete operation type can be found here

https://github.com/abhishek376/elasticsearch-hive

Elasticsearch BulkAPI requires _id field to delete the document. So I have to modify the indexing code to insert the _id as id (can be any id, specified in properties). It the uses the same column as _id which is then passed to Elasticsearch to delete the document. So we create an external table

CREATE EXTERNAL TABLE delete_artists (
    id      BIGINT // _id field
STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists/', 
              'es.operation.type' = 'delete',
              'es.id.field' = 'id'); //ES Bulk API requires the ES 
_id to delete the document. While inserting data we override _id with id. Its uses the id field property as _id.

Any data passed to the table is then passed down to ElasticSearch to delete the document.

 INSERT OVERWRITE TABLE delete_artists 
   SELECT id FROM source s;

I hope this helps some one. You can reach me at @abhishek376

Update operation in Hive (pseudo)

Tags

, , ,

As any Hadoop systems, Hive is a read-only system it only allows insert and insert overwrite the entire table or partitions. Our backend system is MS SQL we use Hive to mash the data and move the data to Elasticsearch and ES serves the analytics requests. But when rows are updated in SQL we have to pass the update all the way to ES, this means we have to update the hive data. This post describes how we achieved pseudo updates in Hive. To start with the SQL tables for example is

Id  Value
1     "hi"
2     "hellp"

We sqooped the data into Hive tables. And now we updated the hellp to hello in SQL. To update the same in Hive we have to delete the hive table/partition and re-insert the data in hive.

To solve this problem we started by adding a modified date column to the SQL table which is updated by a SQL trigger. The incremental sqoop job appends the modified rows to the Hive table. Now the hive table has both hello and hellp. Hello with a more recent modified date.

We started by group the rows by id and sorting them by modified date

select id, value, modifieddatetime
   from table1
      distribute by id 
      sort by id, modifieddatetime desc

Next, we start by ranking all the rows using a Hive UDF.

select
  id, value, modifieddatetime,
   rank(id) krank
from(
select id, value, modifieddatetime
   from table
      distribute by id
      sort by id, modifieddatetime desc) t1)

The Rank UDF can be downloaded from Github. To add the UDF permanently to Hive follow this post. And we select the most updated row by using where rank = 1

select id
from
(select
  id, value, modifieddatetime,
   rank(id) krank
from(
select id, value, modifieddatetime
   from table
      distribute by id
      sort by id, modifieddatetime desc) t1) t2 
where t2.krank = 1 group by id;

To insert the data into Elasticsearch, data is inserted into Hive external tables with elasticsearch storage handler. And the updated data is passed along to ES. We could use the same to save the data to a new partition or run MR jobs.

Oozie workflow .NET API

Tags

, , , ,

 

Oozie is workflow engine for Hadoop. If you never worked with oozie you should give it a shot. We do a lot of Hadoop stuff while most of the frontend is still .NET which runs using WCF services so this post describes a simple .NET API built up on Oozie REST API to start and stop Oozie workflows.

Oozie has a REST API. We connect to the REST API using .NET HttpWebRequest. You can download the project from Github.

var connection = new OozieConnection("hadoop1", 11000);

We have to post the XML config to the Oozie REST Endpoint.

<?xml version=\"1.0\" encoding=\"UTF-8\"?>
<configuration>
//Map Reduce Queue name
 <property> <name>mapred.job.queue.name</name> <value>default</value> </property>

//User name to run the job under
 <property> <name>user.name</name> <value>root</value> </property>  

//Mark this true if you are using map reduce or pig jobs
 <property> <name>oozie.use.system.libpath</name> <value>true</value> </property>

//properties for the job. The job creates the table {tableName} in the {databaseName} database
 <property> <name>tableName</name> <value>{0}</value> </property>
 <property> <name>databaseName</name> <value>{1}</value> </property>

//Location where the job.properties and the workflow exists. Location should be HDFS.
 <property> <name>oozie.wf.application.path</name>
 <value>hdfs://hadoop1.allegiance.local:8020/user/root/hiveoozie</value> </property>
 </configuration>

Query String action=start will start the workflow immediately

var result = connection.Post("oozie/v1/jobs?action=start", String.Format(xmlData, tableName, databaseName));

Deserialize the response to get the job id.

var serializer = new JsonNetSerializer();
var id = serializer.Deserialize(result).id;

We get job status and keep polling the API until the job is completed. Depending on the job adjust your sleep time.

var statusinfo = connection.Get("oozie/v1/job/" + id + "?show=info");
var status = serializer.Deserialize(statusinfo).Status;

while (status != "SUCCEEDED")
{
 //TODO : Record the status in a shared dictonary for web UI to poll the status
  statusinfo = connection.Get("oozie/v1/job/" + id + "?show=info");
  status = serializer.Deserialize(statusinfo).Status;
  Thread.Sleep(3000);
}

If you have questions or struck you can reach me at @abhishek376 on Twitter.