Notice:
This post is older than 5 years – the content might be outdated.
In an article for Java Magazin way back in 2012 (only a small section of it seems to have survived online(!), although it is still available from the inovex website as a download) I toyed with the idea of using a search engine as a database (not such an unconventional idea, it turned out, since Elastic from time to time decribes its search engine as being a database too), mainly due to cost and usability considerations. The idea gained traction with the release of the aggregation framework in early 2015 and a few months later I was involved with a project where we decided to leverage elasticsearch aggregations for the analysis of internet statistics. In this article I want to share my experience.
The requirements can be summarised briefly as follows:
- load data to elastics indexes in near real time
- up to 50,000 writes per second
- aggregate data across 5-minute, hourly and daily intervals
- the retention period is dependent on aggregation level
- 5 minute aggregation => one day
- hourly aggregations => 7 days
- daily aggregations => 5 years
- data to be consumed in a web frontend with reponse times of < 10s
- elasticsearch acts as the primary data source and as such the cluster should guard against data loss, implement a failover strategy etc.
N.B. our use-case is such that we know that all searches will be discrete/exact i.e. we can turn off all text-search settings such as stemming, analyzing etc. This has a significant bearing on data storage.
I’ll be looking at some interesting technical aspects of the implementation in this and subsequent blog articles, with particular emphasis on features where we (intentionally and sometimes unintentionally!) push the envelope on performance – hence the title of this mini-series. We will start with bulk loading.
Setting the scene
Our data is provided by a separate component, pmacct, and can be read directly from memory or from files written to disk. We opted to persist files first to disk to preserve a clean interface between data collection (the details of which are outside the scope of this article) and bulk loading. We initially looked at logstash as an interface between these files and elasticsearch, but quickly realised that the amount of pre-processing/data transformation was not possible using logstash configuration alone and so decided to build our own bulk import component, a simple command line tool written in java. In this we were helped greatly by the BulkProcessor class made available in the elasticsearch source code, described as such:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/** * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request * (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk * requests allowed to be executed in parallel. * <p/> * In order to create a new bulk processor, use the {@link Builder}. */ public class BulkProcessor implements Closeable { ... |
The BulkProcessor takes care of loading and flushing data from multiple threads and is simple to use. It can be configured to flush either by time, or, as shown below, when a defined bach size is reached:
1 2 3 4 5 6 7 8 9 |
public static BulkProcessor getBulkProcessor(Client client, int flushSize, int concurrency, Listener listener) { LOG.info("Initializing BulkProcessor with [{}] concurrent requests.", concurrency); return BulkProcessor.builder(client, listener).setConcurrentRequests(concurrency).setBulkActions(flushSize) .build(); } |
You can then easily pass objects (as IndexRequests) to your bulk processor instance and they will be executed against the search server as determined by your flush settings e.g.
1 |
bulkProcessor.add(client.prepareIndex(indexName, typeName).setSource(documentAsJson).request()); |
With judicious cluster settings we hav been able to achieve 50k-60k inserts per second from a single instance of our bulk import tool.
Cluster settings/configuration
Apart from well-documented best practices we found the following settings to be useful:
index.number_of_replicas: 1
Explanation: we started with a single replica to keep the amount of background activity to a mimimum. This number can and should of course be reviewed in the context of failover mechanisms and cluster stability.
index.merge.scheduler.max_thread_count: 3 (as we use SSDs)
From the elasticsearch documentation: „The maximum number of threads that may be merging at once. Defaults to Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2)) which works well for a good solid-state-disk (SSD). If your index is on spinning platter drives instead, decrease this to 1.“
index.refresh_interval: 30s
Explanation: keep this well away from the default of 1s in write-heavy environments
index.translog.flush_threshold_size: 1gb
From the elasticsearch documentation: „Finally, you can increase index.translog.flush_threshold_size from the default 512 MB to something larger, such as 1 GB. This allows larger segments to accumulate in the translog before a flush occurs. By letting larger segments build, you flush less often, and the larger segments merge less often. All of this adds up to less disk I/O overhead and better indexing rates. Of course, you will need the corresponding amount of heap memory free to accumulate the extra buffering space, so keep that in mind when adjusting this setting.“
threadpool.bulk.queue_size: 3000
Explanation: see this article on loggly, Tip #9
indices.memory.index_buffer_size: 30%
Bulk import : Lessons learned
- Elasticsearch offers the ability to join data across indices, although this is most appropriate when one source/index is small: for large-scale joins or complex lookups it is advisable to consider denormalising data as much as is practical before writing to indices. This reduces the heavy-lifting needed by the server when performing reads.
- use solid-state disks in read-heavy environments (if possible!)
- eventually we used the built-in (since deprecated) feature of using a field as the document ID. Relying on the generated ID means that although the ID is logged back with any bulk-import error message, if it happends to be an ID about which the client knows nothing, then it difficult to isolate the cause! Our workaround was to construct a surrogate key using the line number of the source file as the prefix of the ID so that we could identify and write out offending rows to „bad“ files.
Read on …
So you’re interested in search based applications, text analytics and enterprise search solutions? Have a look at our website and read about the services we offer to our customers.
Join us!
Are you looking for a job in big data processing or analytics? We’re currently hiring!
4 Kommentare