Read and write data to/from Hive to Elasticsearch


Recently we started working on Elasticsearch. Its pretty amazing at what it does. We use Elasticsearch for real-time analytics. Their website describes the Elasticsearch as

flexible and powerful open source, distributed real-time
search and analytics engine for the cloud

Before ES, We evaluated at-least dozen nosql systems like MongoDB, Riak etc.  but nothing is closer to Elasticsearch. We are a analytics company we run all kinds of statistical analysis of data. Elasticsearch provides a REST API. We developed a .NET wrapper with thrift to talk to the ES REST API. If you are looking for a .NET API for Elasticsearch drop me a line I’m more than happy to share the code.

For a brief background, our system is backed up by a MS-SQL database. We sqoop data from SQL to Hadoop and we mash the data using Hive and write data to Elasticsearch.

To read/write the data from Hive to Elasticsearch there are a couple of ways :

1) Elasticsearch has something called “River” (  River is a service which will run on one of the nodes on the cluster which can read data from an external source or write data. You can do pretty anything with it. You can span a thread which wakes up every one hour to do some work. So I could write a hive river which checks new data every one hour using the Hive Java API and index the new data in elasticsearch but the problem is if we have a big pay load you can make the node inaccessible for a while.  If the river running node fails for the some reason, Elasticsearch starts the river on some other node in the cluster. But due to load created on the node running the river, I was hesitant to do this.

2) Hive provides StorageHandlers which allows you to program custom storage handlers to read and write data from hive. To implement a HiveStrorageHandler one has to implement input, output format and SerDe interfaces of Hadoop. To use the storage handlers one has to create an external table specifying the storage handler for example

    id      BIGINT, // _id
    name    STRING
   STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists/',
              '' = 'id'); // Overrides ES _id with the id used when deleting the documents. Make sure id is unique.

While creating the external table you can also specify various other properties. For example like we can specify  es index to index the data in to. Luckily Elasticsearch team has developed a HiveStorageHandler for Hadoop ( which had some bugs in the code. I reached the author @costinl and he fixed the bugs and patched the main immediately. Thanks @costinl.

How Hive storage handler works :

When data is inserted in to hive external table the map/reduce jobs call the ESOutputFormat class which calls ESRecordWriter to write the records. ESRecordWrite calls the Elasticsearch REST API to index the data. Our requirements are not only to insert the data but also to delete the data from Elasticsearch. But the elasticsearch storage handler only supports indexing the data.  I have to change the storage handler to support deletes. To delete data from elasticsearch I used the Bulk API ( the same API used to index data in to elasticsearch.

I added a property called es.operation.type in the ConfigurationOptions class which can be specified using TBLPROPERTIES when creating the external table. The default is index and you can also specify delete. Which tells the ESRecordWriter to use the Bulk API to delete the data instead of indexing data. The modified  StorageHandler with the delete operation type can be found here

Elasticsearch BulkAPI requires _id field to delete the document. So I have to modify the indexing code to insert the _id as id (can be any id, specified in properties). It the uses the same column as _id which is then passed to Elasticsearch to delete the document. So we create an external table

CREATE EXTERNAL TABLE delete_artists (
    id      BIGINT // _id field
STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists/', 
              'es.operation.type' = 'delete',
              '' = 'id'); //ES Bulk API requires the ES 
_id to delete the document. While inserting data we override _id with id. Its uses the id field property as _id.

Any data passed to the table is then passed down to ElasticSearch to delete the document.

   SELECT id FROM source s;

I hope this helps some one. You can reach me at @abhishek376



  1. Good stuff. We don’t support delete operations yet for two reasons:
    1. they are highly destructive and one gets much better control when working directly with ES
    2. dropping an external table, by default, in Hive drop only the table not its view.

    These being said, it’s part of our roadmap to support as many Hive/Pig operators natively on ES.

    Of course, feedback and contributions are welcome.


    1. Costin,
      We have exactly the same use case, but since we stream data from kafka to hadoop and we dont really work in an incremental way, we agree that we have a window of time where the data is somehow unstable (10′), therefore we always bring the data from 30′ ago to make sure everything is great. Disasters happen and kafka can be down cause the aggregations to be replayed, etc. How can we guarantee that we are not duplicating the data without the change @abhishek376 is proposing?
      Do you think that by working in a non incremental way is a mistake probably?


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s