API Reference¶
ConsumerManager¶
The main class for consuming messages from Kafka.
Constructor¶
ConsumerManager(
config: dict[str, str],
topics: list[SourceTopic],
cutoff_ms: int,
batch_size: int,
)
Parameters:
config: Kafka consumer configuration (e.g.,bootstrap.servers,group.id)topics: List ofSourceTopicobjects defining topics and offset policiescutoff_ms: Unix timestamp (milliseconds) defining the boundary between replay and live modebatch_size: Maximum number of messages to return perpoll()call
Example:
from kafkars import ConsumerManager, SourceTopic
manager = ConsumerManager(
config={
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
},
topics=[SourceTopic.from_earliest("events")],
cutoff_ms=time.time_ns() // 1_000_000,
batch_size=1000,
)
Methods¶
poll¶
Poll for messages from Kafka.
Parameters:
timeout_ms: Maximum time to wait for messages (milliseconds)
Returns: PyArrow RecordBatch with message data
is_live¶
Check if all partitions have caught up to their replay end offset or cutoff time.
partition_state¶
Get the current state of all partitions.
held_message_count¶
Get the number of messages currently buffered but not yet released.
paused_partition_count¶
Get the number of partitions currently paused due to backpressure.
get_priming_watermark¶
Get the current low water mark timestamp, or None if all partitions are live.
SourceTopic¶
Defines a topic with its offset policy.
Factory Methods¶
from_earliest¶
Create a SourceTopic starting from the earliest available offset.
from_latest¶
Create a SourceTopic starting from the latest offset (new messages only).
from_relative_time¶
Create a SourceTopic starting from a relative time offset.
Parameters:
name: Topic nametime_ms: Milliseconds before now
from_absolute_time¶
Create a SourceTopic starting from an absolute Unix timestamp.
Parameters:
name: Topic nametime_ms: Unix timestamp in milliseconds
Schemas¶
MESSAGE_SCHEMA¶
PyArrow schema for message batches:
from kafkars import MESSAGE_SCHEMA
# Fields:
# - key: binary (nullable)
# - value: binary (nullable)
# - topic: utf8
# - partition: int32
# - offset: int64
# - timestamp: timestamp[ms, tz=UTC]
PARTITION_STATE_SCHEMA¶
PyArrow schema for partition state:
from kafkars import PARTITION_STATE_SCHEMA
# Fields:
# - topic: utf8
# - partition: int32
# - replay_start_offset: int64
# - replay_end_offset: int64
# - consumed_offset: int64
# - released_offset: int64
# - last_consumed_timestamp: timestamp[ms, tz=UTC] (nullable)
# - is_live: bool
# - is_paused: bool
Utility Functions¶
get_message_schema¶
Get the message schema from the Rust library.
get_partition_state_schema¶
Get the partition state schema from the Rust library.
validate_source_topic¶
Validate a SourceTopic object. Raises an exception if invalid.