Module consumer

Source
Expand description

Kafka consumer module.

§Overview

Wraps [rdkafka]’s [StreamConsumer] with a small convenience API that can be initialised from environment variables or explicit parameters.

  • Environment‑driven bootstrap.servers and group.id.
  • [ConsumerContext] implementation that logs rebalances & commits via [tracing].
  • stream() exposing an async stream of [BorrowedMessage].
  • Error handling with custom ConsumerError type.

§Example

use ds_event_stream_rs_sdk::consumer::KafkaConsumer;
use ds_event_stream_rs_sdk::model::topics::Topic;
use ds_event_stream_rs_sdk::error::{Result, SDKError};
use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};

use tokio_stream::StreamExt;
use tracing::{info, error};

#[tokio::main]
async fn main() -> Result<(), SDKError> {
    let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
    let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };

    let consumer = KafkaConsumer::default(&bootstrap_servers, &[Topic::DsPipelineJobRequested], "group-id", &credentials)?;
    let mut event_stream = consumer.event_stream();

    while let Some(next) = event_stream.next().await {
        match next {
            Ok(event) => info!("Received event: {:?}", event),
            Err(err) => error!("Failed to deserialize event: {}", err),
        }
    }
    Ok(())
}

Modules§

error
Consumer error module.

Structs§

KafkaConsumer
Thin wrapper around [StreamConsumer].
TracingContext
Logs rebalance lifecycle & commit callbacks via [tracing].