Expand description
Kafka producer module.
§Overview
A thin, opinionated wrapper around [rdkafka
]’s [FutureProducer
]. The
goal is to minimise boiler‑plate when publishing JSON‑serialisable payloads
to the DS Event Stream Kafka cluster while still giving full control over
configuration when needed.
Features
- Lazy, fallible construction via
KafkaProducer::default
. - Emits structured [
tracing
] spans for each send operation. - Transparently maps errors into your project’s
SDKError
enum.
§Example
use ds_event_stream_rs_sdk::producer::KafkaProducer;
use ds_event_stream_rs_sdk::model::{EventStream, topics::Topic};
use ds_event_stream_rs_sdk::error::Result;
use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
use uuid::Uuid;
use chrono::Utc;
#[tokio::main]
async fn main() -> Result<()> {
let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
let producer = KafkaProducer::default(&bootstrap_servers, &credentials)?;
let payload = EventStream {
id: Uuid::new_v4(),
session_id: Uuid::new_v4(),
tenant_id: Uuid::new_v4(),
event_source: "test".to_string(),
event_type: "test".to_string(),
timestamp: Utc::now(),
created_by: "test".to_string(),
md5_hash: "test".to_string(),
request_id: None,
owner_id: None,
product_id: None,
product_schema_uri: None,
event_source_uri: None,
affected_entity_uri: None,
message: Some("hello".to_string()),
payload: None,
payload_uri: None,
context: None,
context_uri: None,
metadata: None,
tags: None,
};
producer.send_event(&Topic::DsPipelineJobRequested, "user-42", &payload, None).await?;
Ok(())
}
Modules§
- error
- Producer error module.
Structs§
- Kafka
Producer - Wrapper around an [
rdkafka::producer::FutureProducer
].