Recently we started working on Elasticsearch. Its pretty amazing at what it does. We use Elasticsearch for real-time analytics. Their website describes the Elasticsearch as
flexible and powerful open source, distributed real-time
search and analytics engine for the cloud
Before ES, We evaluated at-least dozen nosql systems like MongoDB, Riak etc. but nothing is closer to Elasticsearch. We are a analytics company we run all kinds of statistical analysis of data. Elasticsearch provides a REST API. We developed a .NET wrapper with thrift to talk to the ES REST API. If you are looking for a .NET API for Elasticsearch drop me a line I’m more than happy to share the code.
For a brief background, our system is backed up by a MS-SQL database. We sqoop data from SQL to Hadoop and we mash the data using Hive and write data to Elasticsearch.
To read/write the data from Hive to Elasticsearch there are a couple of ways :
1) Elasticsearch has something called “River” (http://www.elasticsearch.org/guide/reference/river/) River is a service which will run on one of the nodes on the cluster which can read data from an external source or write data. You can do pretty anything with it. You can span a thread which wakes up every one hour to do some work. So I could write a hive river which checks new data every one hour using the Hive Java API and index the new data in elasticsearch but the problem is if we have a big pay load you can make the node inaccessible for a while. If the river running node fails for the some reason, Elasticsearch starts the river on some other node in the cluster. But due to load created on the node running the river, I was hesitant to do this.
2) Hive provides StorageHandlers which allows you to program custom storage handlers to read and write data from hive. To implement a HiveStrorageHandler one has to implement input, output format and SerDe interfaces of Hadoop. To use the storage handlers one has to create an external table specifying the storage handler for example
CREATE EXTERNAL TABLE artists ( id BIGINT, // _id name STRING STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler' TBLPROPERTIES('es.resource' = 'radio/artists/', 'es.id.field' = 'id'); // Overrides ES _id with the id used when deleting the documents. Make sure id is unique.
While creating the external table you can also specify various other properties. For example like we can specify es index to index the data in to. Luckily Elasticsearch team has developed a HiveStorageHandler for Hadoop (https://github.com/elasticsearch/elasticsearch-hadoop) which had some bugs in the code. I reached the author @costinl and he fixed the bugs and patched the main immediately. Thanks @costinl.
How Hive storage handler works :
When data is inserted in to hive external table the map/reduce jobs call the ESOutputFormat class which calls ESRecordWriter to write the records. ESRecordWrite calls the Elasticsearch REST API to index the data. Our requirements are not only to insert the data but also to delete the data from Elasticsearch. But the elasticsearch storage handler only supports indexing the data. I have to change the storage handler to support deletes. To delete data from elasticsearch I used the Bulk API (http://www.elasticsearch.org/guide/reference/api/bulk/) the same API used to index data in to elasticsearch.
I added a property called es.operation.type in the ConfigurationOptions class which can be specified using TBLPROPERTIES when creating the external table. The default is index and you can also specify delete. Which tells the ESRecordWriter to use the Bulk API to delete the data instead of indexing data. The modified StorageHandler with the delete operation type can be found here
Elasticsearch BulkAPI requires _id field to delete the document. So I have to modify the indexing code to insert the _id as id (can be any id, specified in properties). It the uses the same column as _id which is then passed to Elasticsearch to delete the document. So we create an external table
CREATE EXTERNAL TABLE delete_artists ( id BIGINT // _id field STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler' TBLPROPERTIES('es.resource' = 'radio/artists/', 'es.operation.type' = 'delete', 'es.id.field' = 'id'); //ES Bulk API requires the ES _id to delete the document. While inserting data we override _id with id. Its uses the id field property as _id.
Any data passed to the table is then passed down to ElasticSearch to delete the document.
INSERT OVERWRITE TABLE delete_artists SELECT id FROM source s;
I hope this helps some one. You can reach me at @abhishek376