Kafka Connect and Confluent: Modernizing our Data Pipelines and ETL for Microservices

Alex Silva
WorkMarket Engineering
8 min readApr 3, 2018

--

At WorkMarket, like many other growth-stage startups, we’re in the midst of a transition from a monolithic architecture to a microservices architecture.

Every team has had to plan for the transition in its own way. For us, the Data Team, this meant reevaluating our current ETL (Extract, Transform, Load) processes and pipelines in order to plan for the increasing complexity that comes with maintaining more data sources and repositories than can be handled manually.

Life in a Monolith: Where We Are Today

Currently, we use AWS Data Pipeline to move data from our MySQL monolith database to our Redshift warehouse. We have a Github repo that houses our Redshift create table DDL (Data Definition Language) and a pipeline initialization script. When there’s a new pipeline to add, you submit a PR that includes the DDL for the Redshift table you’d like added. Our DBA (Database Administrator) then runs the pipeline initialization script using your DDL. Finally, the pipeline goes live and your data begins publishing to the warehouse.

It’s worked “okay” for us so far. While it was the appropriate solution with a data engineering team of one, there are issues with this setup going forward:

  1. The pipeline task runners are always online, even when they’re not doing work. (Multiple EC2 instances add up cost-wise, even when running idle.)
  2. Our pipeline initialization script is a crufty set of bash commands and require a non-trivial amount of work to swap in new data sources and sinks. (Esoteric knowledge is needed.)
  3. AWS is very configurable and familiar to our team, but there’s a growing desire for open source and to be more in control of critical components of our analytics stack. (Configurable, reliable components are great, but can’t be customized exactly how we need them anymore.)
  4. Data Pipeline is, in the main, oriented around batch processing and we’re starting to think about how to achieve real-time pipelines. (Real-time data is table stakes in the on-demand economy.)
  5. As our microservices multiply, having our one DBA as our pipeline gatekeeper is becoming a bottleneck towards the timely creation of new pipelines. (Automation will set us free.)

Ultimately, given the chance to try something new with our ETL, we were curious to see what other tools were out there to help meet our needs going forward.

Life with Microservices: Our Path Forward

Monoliths mean singular databases, and for our current pipelines are well and good. A microservices architecture, however, needn’t be limited to such strict engineering paradigms. Further, while we as an engineering team have standardized on Java + MySQL for our language and database for now, that’s not necessarily going to be true forever or for all microservices. (Parts of our data stack, for instance, rely on Python and Postgres.)

Our Platform team uses Kafka to publish logs and metrics from new microservices as they come online. As we rethought our approach, some questions came to mind:

  • Could we piggyback on that existing infrastructure to avoid duplicating effort?
  • Can our next-generation pipelining efforts work on that streaming platform? (And do we have the resources to support that?)
  • If we were able to develop a real-time, streaming data system available for any developer to tap into, might that open up radical new possibilities for our product?
  • Could we reimagine our ETL not just as a mass of configuration, GUIs, and opaque managed tools, but rather as the central nervous system for our entire development organization?

The last question lead us to some serious existential debate; but, more practically, as a relatively small organization, could we do it?

Confluent and Streaming ETL

We started with the source: Confluent. Confluent was started by the people behind Apache Kafka and who now develop a popular streaming platform built on Kafka. It seemed like an obvious choice.

We found their vision of a “streaming data platform that… combine[s] event streams from both applications and databases” compelling, and appreciated the unity and simplicity of their design. Confluent’s take is that in a multi-producer, multi-consumer, real-time world, the Data Stream should be “a first class citizen”, meaning that it should be treated with as much care and attention as any of product’s core functionality. In our monolithic setup, data is important, but is very much a second-class citizen from an architectural viewpoint.

While the majority of our needs today are perfectly satisfied by batch, daily ETL, requests for real-time data are cropping up more and more. Anticipating both the need for more real-time pipelines and an analytics stack more amenable to a microservice architecture, streaming our ETL seemed like an attractive option.

If we wanted to take advantage of this transition and prepare for real-time pipelines that could connect multiple in-house and third-party apps, a streaming solution built on Kafka certainly was worth checking out as it could get us from concept to production much more quickly than any other option available as of this writing.

Challenges

With the brain trust of our Platform team and armed with Kafka and Confluent’s excellent documentation, we were able to build out pipelines targeting all of the microservice databases in our development environment in 4 perfect development days with 2 engineers. This was far faster than we expected, leading to a round of handshakes amongst a pleased leadership team.

The first step was setting up our environment. Confluent makes it easy to install their Confluent Platform and it can be done in a variety of supported ways. We used Docker since Confluent maintains their own Docker images and we were already comfortable using it to install and administer applications. In about a day we were able to piece together a one node deployment, with Zookeeper, one Kafka broker, Confluent Schema Registry, Kafka Connect, and Confluent Control Center all running on Docker.

From there we started to design our pipelines. Each pipeline breaks down into roughly three stages:

  1. Confluent JDBC source connector writes source database table changes to Kafka Topic.
  2. Confluent S3 sink* connector writes Kafka Topic to S3 partitions.
  3. Apache Airflow* writes S3 partitions to Redshift table.

*Apache Airflow (or simply “Airflow”) is a component we already use for a variety of scheduled workflows.

*For Confluent, “Sink” means a repository or data store, which we’ll use interchangeably throughout the remainder of this post.

In future posts we’ll provide a more step-by-step guide to setting up these pipelines. For now, we’ll review a few notable challenges we encountered:

Challenge #1

The first challenge was learning how to tune both Connect and our source databases to handle the initial load of table rows for large tables, tables with millions of rows and a couple GBs in size. The Connect documentation helped a bit, but there were a few gotchas that we needed to overcome on our own for this use case:

  1. The Connect worker’s default maximum heap size of 256MB caused connectors to throw OOM (Out of Memory) exceptions soon after initializing. Bumping up the Connect worker’s Java heap space to 8GB worked well for us.
  2. Confluent provides a connector configuration key called batch.max.rows which allows you to control “how much memory is used to buffer data by the connector”. For example, setting a value of 10 instructs the connector to buffer enough memory for 10 rows and to stop processing after 10 rows. However, this doesn’t enforce any kind of limit over the query run against the source, and when the query fetches the entire contents of a large table the JDBC driver can eat up too much memory. To solve this we added the useCursorFetch and defaultFetchSize MySQL JDBC options to our connection string. This allowed us to control query size before the connector starts processing rows. Setting defaultFetchSize to 250,000 worked well for us. It looks like Confluent is working on allowing this type of behavior to be set in the connector configuration directly.
  3. Some of our large source databases would run out of memory too. We needed to ensure all of our source databases had enough memory to create sort indexes for all of their tables. 5GB was enough to avoid issues since most of our large tables are around that size.

Challenge #2

The second challenge was understanding how to design a configuration taxonomy appropriate for the source connector. In the simplest case, Connect allows you to target whole databases and basically just set-it-and-forget-it: one source connector config can target every table in the database and track when new tables and columns are added. However, not every table is designed the same way nor requires the same configuration. Some reasons for custom configuration:

  • Some tables have data, like PII (Personally Identifiable Information), that would be inadvisable to store outside of the specific microservice.
  • Some tables are append-only rather than modify-in-place.
  • Some tables have differently named incrementing column names (id, seq, sequence, etc).

And so on. We ended up writing a script that outputs source connector configuration based on these variables:

generate_source_connector_config.py \
-n <connector_name> (required) \
-e <endpoint> (required) \
-p <password> (required) \
-t <tables> (ex. company,user_to_company. required.) \
-i <incrementing_column_name> (ex. id, seq, sequence. default id) \
-c <timestamp_column_name> (ex. modified_on, modified_date. default modified_on) \
-m <mode> (ex. incrementing, timestamp+incrementing. default timestamp+incrementing) \
-q <query> (ex. select id,name from user. default null) \
-a <postfix_tables_to_connector_name> (ex. true. default false)

This went a long way towards enforcing consistency and reproducibility in our configuration. It’s also trivial to quickly initialize our 50+ pipelines with this script.

Challenge #3

The third challenge was tuning the S3 sink connectors to work properly with both low- and high-producing sources. Some source tables have highly-frequent writes, like our user login audit table; others have less-frequent writes, like the tables that toggle experimental in-development features.

In the high-write case, we want to make sure we set our S3 sink flush size — the number of Kafka messages that get written to one S3 file — high enough so we get the advantage of batch writes and avoid overwhelming our sink connectors. (Setting the S3 connector’s flush.size to 10,000 messages per S3 file worked well for us.) In the low-write case, we may never reach a high flush size, but we still want to make sure those messages get over to the S3 sink in a timely manner. To achieve this, we set rotate.schedule.interval.ms to 1 hour. This ensured that topics still flush to S3 files every hour, regardless if they’ve hit flush size.

Conclusion

In just a couple months, our team was able to build a sophisticated data streaming architecture that could demonstrate a stream-first approach to ETL, shepherding our Data into the first-class citizen it needs to be. This was a way of doing things that went beyond the simple and reliable ETL we had used and loved.

Data-driven organizations should, tautologically, have first-class data-driven systems. This means these systems should not be a loosely related set of ETL tools, tools that target database data and event data differently. Instead, it should be powered by a unified, central, managed data stream that conforms all of the organization’s data to the stream “primitive”, no matter the provenance of the data, database, application, third-party, or otherwise.

For a growing company like ours with ever-increasing engineer headcount, microservices, third-party tools, complexity, and demand for real-time data, it made sense for us to start thinking about more robust, cleaner data management approaches. Using the Confluent Platform and Kafka allowed us to both see and reap the benefits of the stream-first approach quickly. It opened up all of our data to real-time consumption, empowering independent teams of data producers and consumers, centralizing management of metadata, and so on.

As of this writing, we have 50+ Confluent Platform based pipelines running in our development environment and we’re excited to start thinking about next steps as we expect to have 3-digits worth of pipelines running in production by years end, and possibly 4-digits in 2019. For now, we’ll be putting pipelines into our production environment, adding application and third-party stream data, enhancing our reporting with real-time metrics, and building real-time personal data monitoring apps for GDPR compliance efforts.

--

--