Expand description
Kafka consumer module.
§Overview
Wraps [rdkafka
]’s [StreamConsumer
] with a small
convenience API that can be initialised from environment variables or
explicit parameters.
- Environment‑driven
bootstrap.servers
andgroup.id
. - [
ConsumerContext
] implementation that logs rebalances & commits via [tracing
]. stream()
exposing an async stream of [BorrowedMessage
].- Error handling with custom
ConsumerError
type.
§Example
use ds_event_stream_rs_sdk::consumer::KafkaConsumer;
use ds_event_stream_rs_sdk::model::topics::Topic;
use ds_event_stream_rs_sdk::error::{Result, SDKError};
use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
use tokio_stream::StreamExt;
use tracing::{info, error};
#[tokio::main]
async fn main() -> Result<(), SDKError> {
let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
let consumer = KafkaConsumer::default(&bootstrap_servers, &[Topic::DsPipelineJobRequested], "group-id", &credentials)?;
let mut event_stream = consumer.event_stream();
while let Some(next) = event_stream.next().await {
match next {
Ok(event) => info!("Received event: {:?}", event),
Err(err) => error!("Failed to deserialize event: {}", err),
}
}
Ok(())
}
Modules§
- error
- Consumer error module.
Structs§
- Kafka
Consumer - Thin wrapper around [
StreamConsumer
]. - Tracing
Context - Logs rebalance lifecycle & commit callbacks via [
tracing
].