All posts
Automating Data Pipelines : Python-Driven Ingestion into ClickHouse

Automating Data Pipelines : Python-Driven Ingestion into ClickHouse

January 30, 20265 min readReshma M
Share:

Modern data systems rely heavily on automating data pipelines to ensure fast, reliable, and repeatable data movement.That’s where ClickHouse + Python becomes a powerful combo. In this blog, we’ll walk through how to automate data ingestion into a distributed ClickHouse cluster using Python, making your pipelines reliable, repeatable, and production-ready.

Why ClickHouse for Data Pipelines?

ClickHouse is a high-performance columnar database designed for analytical workloads. It shines when you need:

  • Blazing-fast queries on large datasets.
  • Horizontal scalability using shards & replicas.
  • Efficient batch & streaming ingestion.
  • Efficient storage with column compression.

To Know more about ClickHouse, Check out my previous blog here, ClickHouse OLAP: Why Modern Companies Need OLAP Systems Like ClickHouse.

Python complements ClickHouse by enabling:

  • Easy automation.
  • Integration with ETL/ML workflows.
  • Better error handling and observability.

Architecture Overview

Cluster Setup

  • 2 shards × 2 replicas.
  • Local tables: employee_local (ReplicatedMergeTree).
  • Distributed table: employee_distributed.
  • Inserts always go through the Distributed table.
  • The distributed table automatically routes data across shards.
Python Script → Distributed TableLocal Tables (on each shard).                                     

Step 1: System Prerequisites (Linux)

Before creating a Python virtual environment, ensure your system supports it.

Install Python venv Support (Ubuntu / Debian)
sudo apt update
sudo apt install -y python3-venv

This step is required only once per system,
If skipped, python3 -m venv may fail.

Step 2: Project Setup

Create Project Directory

mkdir clickhouse-python-client
cd clickhouse-python-client

Create and Activate Virtual Environment

python3 -m venv venv
source venv/bin/activate

After activation, your prompt will look like:

(venv) user@machine:~/clickhouse-python-client$

Step 3: Install Python Dependencies

We use the official clickhouse-connect Python client:

pip install --upgrade pippip install clickhouse-connect

Verify installation:

pip list | grep clickhouse
Expected output: 
clickhouse-connect 0.x.x

Project Structure

clickhouse-python-client/
├── venv/
└── insert_employee.py

Step 4: Python Ingestion Script

Below is a production-style Python script with:

  • Data ingestion and validation
  • Logging
  • Error handling
  • Entry Point

#Imports

import sysimport loggingimport clickhouse_connect

#Logging Configuration

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(name)

#ClickHouse Configuration

CLICKHOUSE_CONFIG = {"host": "localhost","port": 8123,"username": "default","password": "","database": "company","table": "employee_distributed"}

#Create ClickHouse Client

def create_clickhouse_client():
  logger.info("Creating ClickHouse client connection")
  return clickhouse_connect.get_client(
    host=CLICKHOUSE_CONFIG["host"],
    port=CLICKHOUSE_CONFIG["port"],
    username=CLICKHOUSE_CONFIG["username"],
    password=CLICKHOUSE_CONFIG["password"]
)

#Insert and Validate Data

def insert_and_validate(client):
data = [
(21, "AB", "AI"),
(22, "BC", "ML"),
(23, "QA", "DS"),
(24, "LW", "AI"),
(25, "KR", "ML")
]
logger.info("Inserting data into Distributed table")
 
client.insert(
    table=f"{CLICKHOUSE_CONFIG['database']}.{CLICKHOUSE_CONFIG['table']}",
    data=data,
    column_names=["id", "name", "dept"]
)
 
logger.info("Insert completed. Validating data")
 
result = client.query(
    f"""
    SELECT count(*)
    FROM {CLICKHOUSE_CONFIG['database']}.{CLICKHOUSE_CONFIG['table']}
    """
)
 
row_count = result.result_rows[0][0]
logger.info(f"Total rows visible in Distributed table: {row_count}")

#Entry Point with Error Handling

def main():   
  try:
    logger.info("starting Clickhouse insert job")
    client = create_clickhouse_client()
    insert_and_validate(client)
 
    logger.info("Job completed successfully")
    sys.exit(0)
 
  except Exception:
    logger.error("Job failed", exc_info=True)
    sys.exit(1)
if __name__== "__main__":
    main()

Step 5: Run the Script

Activate the virtual environment and run:

source venv/bin/activatepython insert_employee.py

If everything works, you’ll see logs like:

Screenshot- Successful output

What Happens Behind the Scenes?

When you insert into the Distributed table:

  • ClickHouse determines the shard based on the sharding key.
  • Data is routed to the correct shard.
  • Each shard replicates data internally.
  • Queries on the distributed table fetch data from all shards.

This means your Python script doesn’t need to worry about cluster complexity, ClickHouse handles it.

Production Best Practices

  • Always insert via Distributed tables.
  • Use batch inserts for large volumes.
  • Store secrets using env variables.
  • Add retries for network failures.
  • Monitor using system.query_log.
  • Log every pipeline stage.

Note:

Cluster Not Mandatory

Although this blog demonstrates ingestion into a distributed ClickHouse cluster, the same Python-based ingestion approach works perfectly with a single-node ClickHouse setup as well. This example uses a cluster setup purely to illustrate how Python integrates with distributed environments, but the ingestion logic is identical for standalone deployments too.

Handling Mistakes in ClickHouse Inserts

ClickHouse does NOT support traditional ROLLBACK for INSERTs

Unlike transactional databases (like MySQL or PostgreSQL), ClickHouse is designed for analytical workloads, and inserts are append-only. Once data is written, you cannot simply run a ROLLBACK to undo it.

But don’t worry, you still have smart ways to handle mistakes and avoid duplicates.

  • Use a ReplacingMergeTree Table.
  • Use a Version/timestamp Column.
  • insert Using a Deduplication Key.
  • Use ALTER TABLE DELETE (For Rare Corrections).
  • Design idempotent ingestion logic in Python Script.

ClickHouse is built for speed and scale, not transactional rollbacks. Instead of undoing inserts, design your pipelines to be:

  • Idempotent
  • Version-aware
  • Deduplication-friendly

Conclusion

By Automating data pipelines (ingestion) into ClickHouse using Python gives you:

  • Reliable ingestion pipelines
  • Scalable distributed storage
  • High-performance analytics
  • Production-ready observability

This approach bridges the gap between data engineering and analytics, making your systems faster and more reliable.

Reference

Clickhouse Python Driver – https://clickhouse.com/docs/integrations/python

Python client examples – .https://clickhouse.com/docs/knowledgebase/python-clickhouse-connect-example

Advanced inserting with python – https://clickhouse.com/docs/integrations/language-clients/python/advanced-inserting

Share: