This guide shows you how to stream every insert, update, and delete from MongoDB into the ClickHouse® database in near real time, using Debezium, Apache Kafka, and Docker. MongoDB is the most different source we cover, because it stores nested documents rather than flat rows, so we spend extra time on how those documents become columns in ClickHouse.
This article is self-contained. If Debezium and ClickHouse are brand new to you, the short overview in What is Debezium and how to offload analytics to ClickHouse is a good primer first.
No prior experience with Debezium, Kafka, or ClickHouse is assumed.
What you will build
A pipeline where MongoDB exposes changes through a feature called change streams, Debezium reads that stream and turns each change into an event, Apache Kafka stores the events durably, and the ClickHouse Kafka Connect Sink writes them into a ClickHouse table. Change a document in MongoDB, and the same change appears in ClickHouse a moment later. Everything runs locally in Docker.
What is Change Data Capture, in plain English
Your application stores data in MongoDB, and your analytics team wants to run heavy reports. Running them on the production database would slow it down for real users, and copying the whole database every night is stale and wasteful. Change Data Capture copies only what actually changed, as it happens, by reading the database's own change feed. It adds almost no load, because it does not poll your collections with queries.
How MongoDB CDC is different
With the relational databases in this series, Debezium reads a log file directly. MongoDB works differently in two ways that you need to understand up front.
First, MongoDB exposes changes through change streams, a built-in feature that emits an ordered feed of every change. Change streams only work on a replica set, which is MongoDB's mechanism for keeping copies of data on more than one server. This is not optional: a standalone MongoDB server has no change stream, so Debezium cannot read it. The good news is that you can run a replica set with a single member, which is exactly what we do for this tutorial.
Second, MongoDB stores documents, not rows. A single document can contain nested objects and arrays, for example an airport with a nested location object holding latitude and longitude. ClickHouse wants flat columns, so part of our job is to flatten those nested documents on the way through. Debezium and Kafka Connect give us the tools to do this cleanly.
Why MongoDB and ClickHouse are a great pair
MongoDB is flexible and fast for application data, but large analytical queries that scan whole collections are not its strength. ClickHouse is built precisely for that: scanning and aggregating enormous numbers of records in milliseconds. Keep MongoDB as your operational store, stream its changes into ClickHouse, and your analysts get a fast, columnar copy without ever touching production.
The tools and the exact versions
Pinning specific, compatible versions is what makes this run. Do not assume a different tag behaves the same way.
| Component | Role | Image and version |
|---|---|---|
| MongoDB | Source database | mongo:8.0 (run as a single-member replica set) |
| Apache Kafka | Event log / transport | apache/kafka:4.1.0 (KRaft mode, no ZooKeeper) |
| Debezium | MongoDB source connector | quay.io/debezium/connect:3.5 |
| ClickHouse Kafka Connect Sink | Loads events into ClickHouse | v1.3.7 |
| ClickHouse | Analytics database | clickhouse/clickhouse-server:26.3 (LTS) |
Debezium 3.5 (specifically 3.5.2.Final, released 2026-06-02) is built and tested against Kafka 4.1 and supports MongoDB 6.0, 7.0, and 8.0, so mongo:8.0 is a good choice. ClickHouse 26.3 is the current Long Term Support release. Kafka 4 uses KRaft and has no ZooKeeper.
The dataset
We will use the OpenFlights airports dataset, published under the Open Database License, so it is free to use. We store each airport as a MongoDB document with a nested location object, which gives us a realistic example of flattening nested data into ClickHouse columns.
Prerequisites
You need Docker and Docker Compose, plus roughly 4 GB of free memory.
Step 1: Prepare a project folder
mkdir mongodb-to-clickhouse-cdc
cd mongodb-to-clickhouse-cdc
mkdir -p connect-pluginsStep 2: Download the ClickHouse Kafka Connect Sink
cd connect-plugins
curl -L -o clickhouse-kafka-connect.zip \
https://github.com/ClickHouse/clickhouse-kafka-connect/releases/download/v1.3.7/clickhouse-kafka-connect-v1.3.7.zip
unzip clickhouse-kafka-connect.zip
rm clickhouse-kafka-connect.zip
cd ..Step 3: The Docker Compose file
Create docker-compose.yml. Note that MongoDB is started with --replSet rs0, which enables replica set mode:
services:
# MongoDB started as a replica set so change streams are available.
mongodb:
image: mongo:8.0
command: ["--replSet", "rs0", "--bind_ip_all"]
ports:
- "27017:27017"
# A single-node Kafka 4 broker in KRaft mode (no ZooKeeper).
kafka:
image: apache/kafka:4.1.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# Kafka Connect (Debezium image) with the ClickHouse sink mounted in.
connect:
image: quay.io/debezium/connect:3.5
depends_on:
- kafka
- mongodb
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: cdc-connect
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
volumes:
- ./connect-plugins/clickhouse-kafka-connect-v1.3.7:/kafka/connect/clickhouse-kafka-connect
# The analytics database.
clickhouse:
image: clickhouse/clickhouse-server:26.3
ports:
- "8123:8123"
- "9000:9000"
environment:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: clickhouse
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: "1"
ulimits:
nofile:
soft: 262144
hard: 262144This setup runs MongoDB without authentication, which keeps the tutorial simple. In production you would enable authentication and create a dedicated Debezium user with read access to your databases plus the local and config databases.
Step 4: Start the stack and initialize the replica set
docker compose up -dMongoDB is now running in replica set mode but the replica set is not yet initialized. Initialize it once. The host name must be mongodb:27017, the name other containers use, so that Debezium connects correctly:
docker compose exec mongodb mongosh --eval \
'rs.initiate({_id: "rs0", members: [{_id: 0, host: "mongodb:27017"}]})'Confirm Kafka Connect can see both connector plugins:
curl -s http://localhost:8083/connector-plugins | grep -o '"class":"[^"]*"'You should see io.debezium.connector.mongodb.MongoDbConnector and com.clickhouse.kafka.connect.ClickHouseSinkConnector.
Step 5: Create a collection and load documents
Open a MongoDB shell:
docker compose exec mongodb mongoshSwitch to a database and insert some airport documents. Notice that each document has a nested location object:
use flights
db.airports.insertMany([
{ airport_id: 1, name: "Goroka Airport", city: "Goroka", country: "Papua New Guinea", iata: "GKA",
location: { lat: -6.081689, lon: 145.391998, altitude: 5282 } },
{ airport_id: 2, name: "Madang Airport", city: "Madang", country: "Papua New Guinea", iata: "MAG",
location: { lat: -5.207083, lon: 145.788700, altitude: 20 } },
{ airport_id: 3, name: "Mount Hagen Airport", city: "Mount Hagen", country: "Papua New Guinea", iata: "HGU",
location: { lat: -5.826790, lon: 144.296005, altitude: 5388 } },
{ airport_id: 4, name: "Nadzab Airport", city: "Nadzab", country: "Papua New Guinea", iata: "LAE",
location: { lat: -6.569803, lon: 146.725977, altitude: 239 } },
{ airport_id: 5, name: "Port Moresby Jacksons Intl", city: "Port Moresby", country: "Papua New Guinea", iata: "POM",
location: { lat: -9.443383, lon: 147.220001, altitude: 146 } }
])Type exit to leave the shell. Each document also has an automatic _id field, which MongoDB adds and which serves as the document's unique identity.
Step 6: Create the target table in ClickHouse
Now the interesting part: we design a flat ClickHouse table for our nested documents. The nested location object will be flattened so that location.lat becomes a column called location_lat, and so on.
ClickHouse is append-only at heart, so to reflect updates and deletes we use a ReplacingMergeTree, which keeps versions of a row and returns the newest per key when asked. We add a version column and a deleted flag.
docker compose exec clickhouse clickhouse-client --password clickhouseCREATE DATABASE IF NOT EXISTS flights;
CREATE TABLE flights.airports
(
id String,
airport_id Int32,
name String,
city String,
country String,
iata String,
location_lat Float64,
location_lon Float64,
location_altitude Int32,
_version UInt64,
_deleted UInt8
)
ENGINE = ReplacingMergeTree(_version, _deleted)
ORDER BY airport_id;We order by airport_id, the stable business key for each airport, so ReplacingMergeTree collapses all versions of one airport to the newest. The id column will hold MongoDB's _id. The three flattened location_* columns will receive the nested fields.
Step 7: Register the Debezium MongoDB source connector
Create a file named mongodb-source.json:
{
"name": "mongodb-source",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0",
"topic.prefix": "flights",
"database.include.list": "flights",
"collection.include.list": "flights.airports",
"capture.mode": "change_streams_update_full",
"snapshot.mode": "initial",
"transforms": "unwrap,flatten",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.delete.tombstone.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
}Send it to Kafka Connect:
curl -X POST -H "Content-Type: application/json" \
--data @mongodb-source.json \
http://localhost:8083/connectorsSeveral settings are MongoDB-specific and worth understanding. mongodb.connection.string points at our replica set; the ?replicaSet=rs0 part is essential. capture.mode of change_streams_update_full tells MongoDB to include the full document on every update, not just the changed fields, which is what we need to keep the ClickHouse copy complete. collection.include.list limits capture to one collection. Events land in a Kafka topic named flights.flights.airports, which is the topic prefix, the database, and the collection.
The transforms are the heart of the MongoDB handling, and there are two of them, applied in order. The first, unwrap, uses MongoDB's own flattening transformation, ExtractNewDocumentState. Note that this is different from the ExtractNewRecordState used for relational databases; MongoDB has its own version. It extracts the document itself from Debezium's change envelope and, with delete.tombstone.handling.mode set to rewrite, turns deletes into rows marked with a __deleted field. The add.fields setting attaches the operation type and the event timestamp, which we use as our version.
The second transform, flatten, is the standard Kafka Connect Flatten transformation. It takes the nested location object and flattens it into top-level fields joined by an underscore, so location.lat becomes location_lat. This is what lets a nested document land cleanly in flat ClickHouse columns.
Confirm it is running:
curl -s http://localhost:8083/connectors/mongodb-source/statusThe state should read RUNNING. Debezium snapshots the existing documents, then streams changes.
Step 8: Register the ClickHouse sink connector
Create a file named clickhouse-sink.json:
{
"name": "clickhouse-sink",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "1",
"topics": "flights.flights.airports",
"hostname": "clickhouse",
"port": "8123",
"database": "flights",
"username": "default",
"password": "clickhouse",
"ssl": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "renameFields",
"transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames": "_id:id,__source_ts_ms:_version,__deleted:_deleted"
}
}Send it:
curl -X POST -H "Content-Type: application/json" \
--data @clickhouse-sink.json \
http://localhost:8083/connectorsThe sink reads from the topic and inserts into flights.airports in ClickHouse, matching fields to columns by name. The ReplaceField transformation renames three fields to match our columns: MongoDB's _id becomes id, the event timestamp becomes _version, and the delete marker becomes _deleted. The flattened location_lat, location_lon, and location_altitude fields already match their columns, so they need no renaming.
Step 9: See it work
Query the snapshotted documents. The FINAL keyword collapses each airport to its newest version:
docker compose exec clickhouse clickhouse-client --password clickhouse \
--query "SELECT airport_id, name, location_lat, location_lon FROM flights.airports FINAL ORDER BY airport_id"You should see the five airports, with the nested coordinates now sitting in flat columns. Now make some changes in MongoDB:
docker compose exec mongodb mongoshuse flights
// Update a nested field
db.airports.updateOne({ airport_id: 1 }, { $set: { "location.altitude": 5300 } })
// Insert a new document
db.airports.insertOne({ airport_id: 6, name: "Wewak Intl", city: "Wewak", country: "Papua New Guinea", iata: "WWK",
location: { lat: -3.583828, lon: 143.669006, altitude: 19 } })
// Delete a document
db.airports.deleteOne({ airport_id: 2 })Wait a couple of seconds, then query ClickHouse again, hiding deleted rows:
docker compose exec clickhouse clickhouse-client --password clickhouse \
--query "SELECT airport_id, name, location_altitude FROM flights.airports FINAL WHERE _deleted = 0 ORDER BY airport_id"You should see airport 1 with its updated altitude, the new airport 6, and airport 2 gone. The update to a nested field flowed all the way through to the flattened location_altitude column, which is the whole point of the flatten transform.
You now have a working real-time CDC pipeline from MongoDB to ClickHouse.
A note on arrays and deeply nested data
Our example has one level of nesting, which the Flatten transform handles perfectly. Real MongoDB documents are sometimes deeper, and often contain arrays, for example a list of reviews. Arrays do not flatten into columns the way objects do. When you hit that, you have two good options: capture only the fields you need and ignore the rest, or store the whole document as a single JSON string column in ClickHouse and use ClickHouse's JSON functions to query inside it. For analytics, the first option is usually cleaner: decide which fields matter and flatten only those.
Production considerations
This tutorial runs a single MongoDB member with no authentication. In production you would run a real replica set with at least three members, enable authentication and create a dedicated Debezium user, run one connector task per collection, keep all events for a document in the same Kafka partition, use TLS everywhere, run at least three Kafka brokers, and enable the ClickHouse sink's exactly-once mode when correctness is critical.
Troubleshooting
If the source connector fails to start with a replica set error, the replica set was not initialized, or it was initialized with the wrong host name. Re-run the rs.initiate command using mongodb:27017 as the host.
If nested fields are missing in ClickHouse, the flatten transform is not configured, or the column names do not match the flattened field names. Remember the delimiter is an underscore, so location.lat becomes location_lat.
If rows never reach ClickHouse, confirm the topic has messages with docker compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic flights.flights.airports --from-beginning --max-messages 1.
If updates appear duplicated, you forgot FINAL, or your ClickHouse ORDER BY key does not match the document identity you are using.
Cleaning up
docker compose down -vReferences
- Debezium MongoDB connector documentation
- Debezium MongoDB New Document State Extraction SMT
- ClickHouse Kafka Connect Sink
- OpenFlights dataset (Open Database License)
What is next
You have now streamed nested documents from MongoDB into flat ClickHouse columns, which is the trickiest mapping in this series. The Kafka and ClickHouse half is the same as every other source. To compare with the relational approach, see the MariaDB, PostgreSQL, MySQL, and Oracle guides.
If you would like help designing a production-grade CDC pipeline into ClickHouse, including how to model document data for analytics, the engineers at Quantrail Data do exactly this. Reach out through our services page and we will be glad to help.



