All posts
MongoDB to ClickHouse® CDC with Debezium 3.5 and Kafka 4: A Complete Beginner's Guide

MongoDB to ClickHouse® CDC with Debezium 3.5 and Kafka 4: A Complete Beginner's Guide

June 21, 202613 min readReshma M
Share:

This guide shows you how to stream every insert, update, and delete from MongoDB into the ClickHouse® database in near real time, using Debezium, Apache Kafka, and Docker. MongoDB is the most different source we cover, because it stores nested documents rather than flat rows, so we spend extra time on how those documents become columns in ClickHouse.

This article is self-contained. If Debezium and ClickHouse are brand new to you, the short overview in What is Debezium and how to offload analytics to ClickHouse is a good primer first.

No prior experience with Debezium, Kafka, or ClickHouse is assumed.

What you will build

A pipeline where MongoDB exposes changes through a feature called change streams, Debezium reads that stream and turns each change into an event, Apache Kafka stores the events durably, and the ClickHouse Kafka Connect Sink writes them into a ClickHouse table. Change a document in MongoDB, and the same change appears in ClickHouse a moment later. Everything runs locally in Docker.

What is Change Data Capture, in plain English

Your application stores data in MongoDB, and your analytics team wants to run heavy reports. Running them on the production database would slow it down for real users, and copying the whole database every night is stale and wasteful. Change Data Capture copies only what actually changed, as it happens, by reading the database's own change feed. It adds almost no load, because it does not poll your collections with queries.

How MongoDB CDC is different

With the relational databases in this series, Debezium reads a log file directly. MongoDB works differently in two ways that you need to understand up front.

First, MongoDB exposes changes through change streams, a built-in feature that emits an ordered feed of every change. Change streams only work on a replica set, which is MongoDB's mechanism for keeping copies of data on more than one server. This is not optional: a standalone MongoDB server has no change stream, so Debezium cannot read it. The good news is that you can run a replica set with a single member, which is exactly what we do for this tutorial.

Second, MongoDB stores documents, not rows. A single document can contain nested objects and arrays, for example an airport with a nested location object holding latitude and longitude. ClickHouse wants flat columns, so part of our job is to flatten those nested documents on the way through. Debezium and Kafka Connect give us the tools to do this cleanly.

Why MongoDB and ClickHouse are a great pair

MongoDB is flexible and fast for application data, but large analytical queries that scan whole collections are not its strength. ClickHouse is built precisely for that: scanning and aggregating enormous numbers of records in milliseconds. Keep MongoDB as your operational store, stream its changes into ClickHouse, and your analysts get a fast, columnar copy without ever touching production.

The tools and the exact versions

Pinning specific, compatible versions is what makes this run. Do not assume a different tag behaves the same way.

ComponentRoleImage and version
MongoDBSource databasemongo:8.0 (run as a single-member replica set)
Apache KafkaEvent log / transportapache/kafka:4.1.0 (KRaft mode, no ZooKeeper)
DebeziumMongoDB source connectorquay.io/debezium/connect:3.5
ClickHouse Kafka Connect SinkLoads events into ClickHousev1.3.7
ClickHouseAnalytics databaseclickhouse/clickhouse-server:26.3 (LTS)

Debezium 3.5 (specifically 3.5.2.Final, released 2026-06-02) is built and tested against Kafka 4.1 and supports MongoDB 6.0, 7.0, and 8.0, so mongo:8.0 is a good choice. ClickHouse 26.3 is the current Long Term Support release. Kafka 4 uses KRaft and has no ZooKeeper.

The dataset

We will use the OpenFlights airports dataset, published under the Open Database License, so it is free to use. We store each airport as a MongoDB document with a nested location object, which gives us a realistic example of flattening nested data into ClickHouse columns.

Prerequisites

You need Docker and Docker Compose, plus roughly 4 GB of free memory.

Step 1: Prepare a project folder

mkdir mongodb-to-clickhouse-cdc
cd mongodb-to-clickhouse-cdc
mkdir -p connect-plugins

Step 2: Download the ClickHouse Kafka Connect Sink

cd connect-plugins
curl -L -o clickhouse-kafka-connect.zip \
  https://github.com/ClickHouse/clickhouse-kafka-connect/releases/download/v1.3.7/clickhouse-kafka-connect-v1.3.7.zip
unzip clickhouse-kafka-connect.zip
rm clickhouse-kafka-connect.zip
cd ..

Step 3: The Docker Compose file

Create docker-compose.yml. Note that MongoDB is started with --replSet rs0, which enables replica set mode:

services:
  # MongoDB started as a replica set so change streams are available.
  mongodb:
    image: mongo:8.0
    command: ["--replSet", "rs0", "--bind_ip_all"]
    ports:
      - "27017:27017"
 
  # A single-node Kafka 4 broker in KRaft mode (no ZooKeeper).
  kafka:
    image: apache/kafka:4.1.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
 
  # Kafka Connect (Debezium image) with the ClickHouse sink mounted in.
  connect:
    image: quay.io/debezium/connect:3.5
    depends_on:
      - kafka
      - mongodb
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: cdc-connect
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
    volumes:
      - ./connect-plugins/clickhouse-kafka-connect-v1.3.7:/kafka/connect/clickhouse-kafka-connect
 
  # The analytics database.
  clickhouse:
    image: clickhouse/clickhouse-server:26.3
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: clickhouse
      CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: "1"
    ulimits:
      nofile:
        soft: 262144
        hard: 262144

This setup runs MongoDB without authentication, which keeps the tutorial simple. In production you would enable authentication and create a dedicated Debezium user with read access to your databases plus the local and config databases.

Step 4: Start the stack and initialize the replica set

docker compose up -d

MongoDB is now running in replica set mode but the replica set is not yet initialized. Initialize it once. The host name must be mongodb:27017, the name other containers use, so that Debezium connects correctly:

docker compose exec mongodb mongosh --eval \
  'rs.initiate({_id: "rs0", members: [{_id: 0, host: "mongodb:27017"}]})'

Confirm Kafka Connect can see both connector plugins:

curl -s http://localhost:8083/connector-plugins | grep -o '"class":"[^"]*"'

You should see io.debezium.connector.mongodb.MongoDbConnector and com.clickhouse.kafka.connect.ClickHouseSinkConnector.

Step 5: Create a collection and load documents

Open a MongoDB shell:

docker compose exec mongodb mongosh

Switch to a database and insert some airport documents. Notice that each document has a nested location object:

use flights
 
db.airports.insertMany([
  { airport_id: 1, name: "Goroka Airport", city: "Goroka", country: "Papua New Guinea", iata: "GKA",
    location: { lat: -6.081689, lon: 145.391998, altitude: 5282 } },
  { airport_id: 2, name: "Madang Airport", city: "Madang", country: "Papua New Guinea", iata: "MAG",
    location: { lat: -5.207083, lon: 145.788700, altitude: 20 } },
  { airport_id: 3, name: "Mount Hagen Airport", city: "Mount Hagen", country: "Papua New Guinea", iata: "HGU",
    location: { lat: -5.826790, lon: 144.296005, altitude: 5388 } },
  { airport_id: 4, name: "Nadzab Airport", city: "Nadzab", country: "Papua New Guinea", iata: "LAE",
    location: { lat: -6.569803, lon: 146.725977, altitude: 239 } },
  { airport_id: 5, name: "Port Moresby Jacksons Intl", city: "Port Moresby", country: "Papua New Guinea", iata: "POM",
    location: { lat: -9.443383, lon: 147.220001, altitude: 146 } }
])

Type exit to leave the shell. Each document also has an automatic _id field, which MongoDB adds and which serves as the document's unique identity.

Step 6: Create the target table in ClickHouse

Now the interesting part: we design a flat ClickHouse table for our nested documents. The nested location object will be flattened so that location.lat becomes a column called location_lat, and so on.

ClickHouse is append-only at heart, so to reflect updates and deletes we use a ReplacingMergeTree, which keeps versions of a row and returns the newest per key when asked. We add a version column and a deleted flag.

docker compose exec clickhouse clickhouse-client --password clickhouse
CREATE DATABASE IF NOT EXISTS flights;
 
CREATE TABLE flights.airports
(
    id                String,
    airport_id        Int32,
    name              String,
    city              String,
    country           String,
    iata              String,
    location_lat      Float64,
    location_lon      Float64,
    location_altitude Int32,
    _version          UInt64,
    _deleted          UInt8
)
ENGINE = ReplacingMergeTree(_version, _deleted)
ORDER BY airport_id;

We order by airport_id, the stable business key for each airport, so ReplacingMergeTree collapses all versions of one airport to the newest. The id column will hold MongoDB's _id. The three flattened location_* columns will receive the nested fields.

Step 7: Register the Debezium MongoDB source connector

Create a file named mongodb-source.json:

{
  "name": "mongodb-source",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0",
    "topic.prefix": "flights",
    "database.include.list": "flights",
    "collection.include.list": "flights.airports",
    "capture.mode": "change_streams_update_full",
    "snapshot.mode": "initial",
 
    "transforms": "unwrap,flatten",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.delete.tombstone.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,source.ts_ms",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": "_"
  }
}

Send it to Kafka Connect:

curl -X POST -H "Content-Type: application/json" \
  --data @mongodb-source.json \
  http://localhost:8083/connectors

Several settings are MongoDB-specific and worth understanding. mongodb.connection.string points at our replica set; the ?replicaSet=rs0 part is essential. capture.mode of change_streams_update_full tells MongoDB to include the full document on every update, not just the changed fields, which is what we need to keep the ClickHouse copy complete. collection.include.list limits capture to one collection. Events land in a Kafka topic named flights.flights.airports, which is the topic prefix, the database, and the collection.

The transforms are the heart of the MongoDB handling, and there are two of them, applied in order. The first, unwrap, uses MongoDB's own flattening transformation, ExtractNewDocumentState. Note that this is different from the ExtractNewRecordState used for relational databases; MongoDB has its own version. It extracts the document itself from Debezium's change envelope and, with delete.tombstone.handling.mode set to rewrite, turns deletes into rows marked with a __deleted field. The add.fields setting attaches the operation type and the event timestamp, which we use as our version.

The second transform, flatten, is the standard Kafka Connect Flatten transformation. It takes the nested location object and flattens it into top-level fields joined by an underscore, so location.lat becomes location_lat. This is what lets a nested document land cleanly in flat ClickHouse columns.

Confirm it is running:

curl -s http://localhost:8083/connectors/mongodb-source/status

The state should read RUNNING. Debezium snapshots the existing documents, then streams changes.

Step 8: Register the ClickHouse sink connector

Create a file named clickhouse-sink.json:

{
  "name": "clickhouse-sink",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "topics": "flights.flights.airports",
    "hostname": "clickhouse",
    "port": "8123",
    "database": "flights",
    "username": "default",
    "password": "clickhouse",
    "ssl": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
 
    "transforms": "renameFields",
    "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.renameFields.renames": "_id:id,__source_ts_ms:_version,__deleted:_deleted"
  }
}

Send it:

curl -X POST -H "Content-Type: application/json" \
  --data @clickhouse-sink.json \
  http://localhost:8083/connectors

The sink reads from the topic and inserts into flights.airports in ClickHouse, matching fields to columns by name. The ReplaceField transformation renames three fields to match our columns: MongoDB's _id becomes id, the event timestamp becomes _version, and the delete marker becomes _deleted. The flattened location_lat, location_lon, and location_altitude fields already match their columns, so they need no renaming.

Step 9: See it work

Query the snapshotted documents. The FINAL keyword collapses each airport to its newest version:

docker compose exec clickhouse clickhouse-client --password clickhouse \
  --query "SELECT airport_id, name, location_lat, location_lon FROM flights.airports FINAL ORDER BY airport_id"

You should see the five airports, with the nested coordinates now sitting in flat columns. Now make some changes in MongoDB:

docker compose exec mongodb mongosh
use flights
// Update a nested field
db.airports.updateOne({ airport_id: 1 }, { $set: { "location.altitude": 5300 } })
// Insert a new document
db.airports.insertOne({ airport_id: 6, name: "Wewak Intl", city: "Wewak", country: "Papua New Guinea", iata: "WWK",
  location: { lat: -3.583828, lon: 143.669006, altitude: 19 } })
// Delete a document
db.airports.deleteOne({ airport_id: 2 })

Wait a couple of seconds, then query ClickHouse again, hiding deleted rows:

docker compose exec clickhouse clickhouse-client --password clickhouse \
  --query "SELECT airport_id, name, location_altitude FROM flights.airports FINAL WHERE _deleted = 0 ORDER BY airport_id"

You should see airport 1 with its updated altitude, the new airport 6, and airport 2 gone. The update to a nested field flowed all the way through to the flattened location_altitude column, which is the whole point of the flatten transform.

You now have a working real-time CDC pipeline from MongoDB to ClickHouse.

A note on arrays and deeply nested data

Our example has one level of nesting, which the Flatten transform handles perfectly. Real MongoDB documents are sometimes deeper, and often contain arrays, for example a list of reviews. Arrays do not flatten into columns the way objects do. When you hit that, you have two good options: capture only the fields you need and ignore the rest, or store the whole document as a single JSON string column in ClickHouse and use ClickHouse's JSON functions to query inside it. For analytics, the first option is usually cleaner: decide which fields matter and flatten only those.

Production considerations

This tutorial runs a single MongoDB member with no authentication. In production you would run a real replica set with at least three members, enable authentication and create a dedicated Debezium user, run one connector task per collection, keep all events for a document in the same Kafka partition, use TLS everywhere, run at least three Kafka brokers, and enable the ClickHouse sink's exactly-once mode when correctness is critical.

Troubleshooting

If the source connector fails to start with a replica set error, the replica set was not initialized, or it was initialized with the wrong host name. Re-run the rs.initiate command using mongodb:27017 as the host.

If nested fields are missing in ClickHouse, the flatten transform is not configured, or the column names do not match the flattened field names. Remember the delimiter is an underscore, so location.lat becomes location_lat.

If rows never reach ClickHouse, confirm the topic has messages with docker compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic flights.flights.airports --from-beginning --max-messages 1.

If updates appear duplicated, you forgot FINAL, or your ClickHouse ORDER BY key does not match the document identity you are using.

Cleaning up

docker compose down -v

References

What is next

You have now streamed nested documents from MongoDB into flat ClickHouse columns, which is the trickiest mapping in this series. The Kafka and ClickHouse half is the same as every other source. To compare with the relational approach, see the MariaDB, PostgreSQL, MySQL, and Oracle guides.

If you would like help designing a production-grade CDC pipeline into ClickHouse, including how to model document data for analytics, the engineers at Quantrail Data do exactly this. Reach out through our services page and we will be glad to help.

Work with Quantrail

Expert ClickHouse services

We design, migrate, tune, and run ClickHouse for teams that own their data, from first architecture through day-two operations. Tell us what you are building and we will help.

Talk to an expert

Manage ClickHouse with CHOps

CHOps is our free, open-source ClickHouse admin tool: monitoring, query profiling, backups, visual access control, and alerting in one self-hosted interface, with zero agents on your servers.

Explore CHOps
Share: