Lambda Architecture with Druid at GumGum
In early 2014, we were faced with a challenge: As our data grew, our current real-time data storage was no longer able to support our growth. We looked for other real-time ingestion systems, and that’s when one of our engineers, Maxime, came across Druid. Let’s backtrack for a minute. The “we” in this story is GumGum, inventors of In-Image Advertising and a leading digital marketing platform based in Santa Monica, the heart of Silicon Beach. The company was founded in 2007, and its data has consistently quadrupled in recent years. We’ve welcomed and embraced this rapid growth, but it’s also kept us in the market for a system that can handle a lot of data.
While we had had a good run with MySQL, which we were using exclusively until recently, we needed something much more scalable. We looked at Druid, which is designed to ingest and access large-scale real-time data and is horizontally scalable because it is distributed. Druid is especially suitable for our real time use cases where we aggregate large numbers of counters for a known set of keys. Because Druid is open source and created by another ad tech company that had needs similar to ours, we decided to give it a try. And the results so far have been impressive.
We now use Druid as part of a wishbone-shaped data processing architecture called Lambda, which is a generic, fault-tolerant, and scalable data processing system that is able to serve a wide range of workloads in which low-latency reads and updates are required. Lambda architecture scales out rather than up.
GumGum ad servers generate billions of events every day that are sent to two sources simultaneously (Kafka and S3). A Storm topology keeps consuming the stream of these events from Kafka in real time. The topology parses JSON events and identifies various metrics to send to Druid. In parallel, a MapReduce job reads the files from S3 every four hours and sends the data to Druid, which then overwrites previously sent data using Storm.
That real-time data gets overwritten by batch data is one of the most important facets of Lambda architecture, because currently, real-time data processing technologies do not guarantee ‘exactly once’ processing one hundred percent of the time. This means that some events may get processed again or some events may get dropped. Various open source technologies are working to make this better, but this is one of the reasons why Lambda architecture recommends that you replace real time data with batch data.
Druid allows us to overwrite our real-time data with batch data by providing an indexer MapReduce job. But because this MapReduce job expects the files to conform to Druid’s data source, we run one more MapReduce job to convert our complex, nested JSON to a format acceptable by Druid’s data source. These files are then sent to an indexing service (HadoopDruidIndexer) that creates Druid segments from these files, uploads them to S3, and updates the segment metadata in MySQL. Within a minute, a Druid coordinator node realizes the metadata change and loads the newly created segments on the historical nodes while discarding the old segments created by the Storm topology. Operating batch pipelines requires us to run two MapReduce jobs in succession. At GumGum, we use AWS Data Pipeline because it offers a workflow engine that orchestrates multiple tasks with complex dependencies.
We do not expose our Druid to client applications directly; instead, we have built a reporting server that sits in front of Druid. The reporting server allows us to join data from Druid with our MySQL database. It also insulates clients from any backend changes and converts data into a Google Visualization format that is easier for dashboard applications to consume. Druid does offer a REST API in case you don’t want to go through the trouble of building your own API server.
The architecture described above processes more than 3 billion events per day in real time, which amounts to 5 TB of new data per day. We are constantly trying to find the right instance types for our servers machines, but here is list of what we are presently using:
Brokers – 2 m4.xlarge (Round-robin DNS)
Coordinators – 2 c4.large
Historical (Cold) – 2 m4.2xlarge (1 x 1000GB EBS SSD)
Historical (Hot) – 4 m4.2xlarge (1 x 250GB EBS SSD)
Middle Managers – 15 c4.4xlarge (1 x 300GB EBS SSD)
Overlords – 2 c4.large
Zookeeper – 3 c4.large
MySQL – RDS – db.m3.medium
Druid has proven to be extremely fast, flexible, and reliable. We love using Druid as our data-accessing technology because it allows us to quickly ingest large amounts of data and query the data using simple REST APIs. Druid is horizontally scalable, so as our needs increase, we can simply add more middle managers and historical nodes. Druid is also easy to upgrade and maintain because most of the data stays in Amazon S3. Our goal next year is to allocate some dedicated time to contribute to Druid codebase. We are looking forward to a long partnership with the Druid community and are enthusiastic about being part of the software’s evolution.
The disk storage you have amounts to approx 7.5TB. At 5TB a day, this is only about 36 hours of data. Is that all you’re retaining and querying, or are you aggressively rolling the data up?
We are aggressively rolling up the data. We only retain hour granularity. That reduces data a lot. Furthermore, the 5TB is the original log size. We parse the logs and extract metrics and dimensions and send them to Druid. Thus we process 5TB data using Storm, but a fraction of that data actually ends up in Druid. We approximately put 10B events to Druid every day, but because of aggregation, they get reduced a lot.
How often do you drop or retie old data in druid? I found you only have 6 historical nodes, is it able to support more than 2 or 3 years queries?
At this moment we don’t have enough data for 2 to 3 years. So we are not supporting 2 to 3 year queries. We only have 10 months of data so far. We do not drop any old data. As time passes, we will increase number of historical nodes to support more data.
Now that some time has passed. Have you made any significant changes to your architecture?
We haven’t done any significant changes to the architecture. We have scaled up Druid cluster. We are also investigating Avro for serialization instead of JSON.
Could you clarify if you are using EBS GP2 or IO1 (Provisioned IOPS). Any thoughts on which would you prefer and why? We are looking at PIOPS mainly because GP2 is not giving us consistent IOPS.
We are not using PIOPs. Main reason is that we don’t need it. We use druid as a backend for our dashboard. The dashboard doesn’t have that much traffic – about 10 users actively use those reports. The IOPs given by GP2 is sufficient for us. Please note that once you provision IOPs, you loose the ability to occasionally go over provisioned IOPS.
Vaibhav, I wanted to clarify if you don’t mind. My question was in relation to your recent Cassandra talk I saw (which I don’t see in the above lambda picture). You meant that you didn’t need PIOPS for the Cassandra, right? Just making sure.
Subhash, even with Cassandra, we don’t use PIOPs. There we use 4 GP2 EBS drives in RAID 0 configuration. Commit logs go to a different disk.
Thanks very much.