kafkars¶
Rust-based, Arrow-powered Python Kafka client for high-throughput data pipelines.
Why kafkars?¶
Python's Global Interpreter Lock (GIL) and memory management create bottlenecks when consuming high-volume Kafka streams. Traditional Python Kafka clients process messages one at a time, requiring serialization/deserialization overhead for each message and limiting throughput.
kafkars solves this by:
- Rust core: All Kafka operations (polling, buffering, ordering) happen in Rust, bypassing the GIL
- Batch processing: Messages are accumulated and returned as Apache Arrow RecordBatches, not individual Python objects
- Zero-copy where possible: Arrow's columnar format enables efficient data transfer between Rust and Python
- Vectorized operations: Process thousands of messages at once with pandas, polars, or any Arrow-compatible library
Use Cases¶
This architecture is ideal for:
- Real-time analytics pipelines
- ML feature stores consuming from Kafka
- High-volume event processing
- Data lake ingestion
Important: Analytics-Focused Design¶
kafkars does not commit offsets. It is designed for analytics and high-throughput batch processing, not transactional workloads.
- No exactly-once semantics: Messages may be reprocessed if your application restarts
- No offset tracking: You control where to start reading via offset policies
- Stateless consumers: Each consumer instance starts fresh based on the configured policy
If you need exactly-once processing, transactional guarantees, or automatic offset management, use a traditional Kafka client like confluent-kafka-python.
Features¶
- Ordered delivery: Messages released in timestamp order across all partitions
- Flexible offset policies: Start from earliest, latest, or any timestamp
- Backpressure management: Automatically pauses fast partitions to prevent memory overflow
- Arrow-native output: Returns PyArrow RecordBatch for efficient downstream processing
Quick Example¶
import time
from kafkars import ConsumerManager, SourceTopic
# Define source topics with offset policies
topics = [
SourceTopic.from_earliest("events"),
SourceTopic.from_relative_time("metrics", 3600_000), # 1 hour ago
]
# Create consumer
manager = ConsumerManager(
config={
"bootstrap.servers": "localhost:9092",
"group.id": "my-consumer-group",
},
topics=topics,
cutoff_ms=time.time_ns() // 1_000_000,
batch_size=10_000,
)
# Poll returns PyArrow RecordBatch
while True:
batch = manager.poll(timeout_ms=1000)
if batch.num_rows > 0:
# Convert to pandas/polars for processing
df = batch.to_pandas()
print(f"Received {len(df)} messages")
if manager.is_live():
print("Caught up to real-time")
break
Getting Started¶
Head over to the Installation guide to get started.