ds_event_stream_rs_sdk/producer/
mod.rs

1//! Kafka producer module.
2//!
3//! ## Overview
4//! A thin, opinionated wrapper around [`rdkafka`]'s [`FutureProducer`].  The
5//! goal is to minimise boiler‑plate when publishing JSON‑serialisable payloads
6//! to the DS Event Stream Kafka cluster while still giving full control over
7//! configuration when needed.
8//!
9//! Features
10//! * Lazy, fallible construction via [`KafkaProducer::default`].
11//! * Emits structured [`tracing`] spans for each send operation.
12//! * Transparently maps errors into your project's [`SDKError`] enum.
13//!
14//! ### Example
15//! ```no_run
16//! use ds_event_stream_rs_sdk::producer::KafkaProducer;
17//! use ds_event_stream_rs_sdk::model::{EventStream, topics::Topic};
18//! use ds_event_stream_rs_sdk::error::Result;
19//! use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
20//! use uuid::Uuid;
21//! use chrono::Utc;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<()> {
25//!     let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
26//!     let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
27//!
28//!     let producer = KafkaProducer::default(&bootstrap_servers, &credentials)?;
29//!     let payload = EventStream {
30//!         id: Uuid::new_v4(),
31//!         session_id: Uuid::new_v4(),
32//!         tenant_id: Uuid::new_v4(),
33//!         event_source: "test".to_string(),
34//!         event_type: "test".to_string(),
35//!         timestamp: Utc::now(),
36//!         created_by: "test".to_string(),
37//!         md5_hash: "test".to_string(),
38//!         request_id: None,
39//!         owner_id: None,
40//!         product_id: None,
41//!         product_schema_uri: None,
42//!         event_source_uri: None,
43//!         affected_entity_uri: None,
44//!         message: Some("hello".to_string()),
45//!         payload: None,
46//!         payload_uri: None,
47//!         context: None,
48//!         context_uri: None,
49//!         metadata: None,
50//!         tags: None,
51//!     };
52//!     producer.send_event(&Topic::DsPipelineJobRequested, "user-42", &payload, None).await?;
53//!     Ok(())
54//! }
55//! ```
56pub mod error;
57
58use std::time::Duration;
59
60use rdkafka::{
61    config::ClientConfig,
62    producer::{FutureProducer, FutureRecord},
63};
64use tracing::{error, info};
65
66use crate::error::{Result, SDKError};
67use crate::model::topics::Topic;
68use crate::model::v1::EventStream;
69use crate::utils::ClientCredentials;
70
71use error::ProducerError;
72
73// region: --> KafkaProducer
74
75/// Wrapper around an [`rdkafka::producer::FutureProducer`].
76///
77/// The producer is configured once and can then be cheaply cloned thanks to
78/// the internal `Arc` in *rdkafka*'s handle.
79#[derive(Clone)]
80pub struct KafkaProducer {
81    inner: FutureProducer,
82}
83
84impl KafkaProducer {
85    /// Explicit configuration.
86    ///
87    /// # Arguments
88    ///
89    /// * `config` - The configuration to use for the producer.
90    /// * `username` - The username to use for authentication
91    /// * `password` - The password to use for authentication
92    ///
93    /// # Returns
94    ///
95    /// * `Result<Self, SDKError>` - The result of the operation
96    ///
97    /// # Errors
98    ///
99    /// * [`SDKError::Producer`] - If the producer fails to create.
100    ///
101    pub fn new(config: ClientConfig) -> Result<Self> {
102        let inner: FutureProducer = config.create().map_err(ProducerError::Kafka)?;
103        Ok(Self { inner })
104    }
105
106    /// Default configuration.
107    ///
108    /// # Arguments
109    ///
110    /// * `bootstrap_servers` - The bootstrap servers to use for the producer.
111    /// * `username` - The username to use for authentication
112    /// * `password` - The password to use for authentication
113    ///
114    /// # Returns
115    ///
116    /// * `Result<Self, SDKError>` - The result of the operation
117    ///
118    /// # Errors
119    ///
120    /// * [`SDKError::Producer`] - If the producer fails to create.
121    ///
122    pub fn default(bootstrap_servers: &str, credentials: &ClientCredentials) -> Result<Self> {
123        let inner: FutureProducer = ClientConfig::new()
124            .set("bootstrap.servers", bootstrap_servers)
125            .set("acks", "all")
126            .set("retries", "3")
127            .set("delivery.timeout.ms", "120000")
128            .set("request.timeout.ms", "30000")
129            .set("message.timeout.ms", "5000")
130            .set("compression.type", "snappy")
131            .set("batch.size", "16384")
132            .set("linger.ms", "5")
133            .set("max.in.flight.requests.per.connection", "5")
134            .set("security.protocol", "SASL_PLAINTEXT")
135            .set("sasl.mechanisms", "SCRAM-SHA-512")
136            .set("sasl.username", credentials.username.clone())
137            .set("sasl.password", credentials.password.clone())
138            .create()
139            .map_err(ProducerError::Kafka)?;
140
141        info!(servers = %bootstrap_servers, "Kafka producer initialised");
142        Ok(Self { inner })
143    }
144
145    /// Sends a key‑ed JSON message to **`topic`**.
146    ///
147    /// * `payload` must be an [`EventStream`] object that implements [`serde::Serialize`].
148    /// * `key` is used for partitioning; choose a deterministic key for *exactly
149    ///   once‑per‑key* semantics.
150    /// * `queue_timeout` is optional; defaults to 5000ms if not provided.
151    ///
152    /// The function is instrumented with [`tracing`]; any error bubbles up as
153    ///
154    /// Arguments
155    ///
156    /// * `topic` - The topic to send the message to
157    /// * `key` - The key to send the message to
158    /// * `payload` - The payload to send the message to
159    /// * `queue_timeout` - The timeout to send the message to
160    ///
161    /// Returns
162    ///
163    /// * `Result<()>` - The result of the operation
164    ///
165    /// Errors
166    ///
167    /// * [`SDKError::Producer`] - If the producer fails to send the message.
168    ///
169    pub async fn send_event(
170        &self,
171        topic: &Topic,
172        key: &str,
173        payload: &EventStream,
174        queue_timeout: Option<Duration>,
175    ) -> Result<()> {
176        let topic_name = topic.to_string();
177        let payload_json = self.serialize_message(payload)?;
178
179        let record = FutureRecord::to(&topic_name).payload(&payload_json).key(key);
180        let timeout = queue_timeout.unwrap_or(Duration::from_millis(5000));
181
182        match self.inner.send(record, timeout).await {
183            Ok(delivery) => {
184                info!(
185                    partition = delivery.partition,
186                    offset = delivery.offset,
187                    "message produced to topic: {}",
188                    topic_name
189                );
190                Ok(())
191            }
192            Err((err, _msg)) => {
193                error!(error = %err, "failed to produce message to topic: {}", topic_name);
194                Err(SDKError::Producer(ProducerError::Kafka(err)))
195            }
196        }
197    }
198
199    /// Serializes an `EventStream<T>` into a Kafka message.
200    ///
201    /// * `T` must implement [`serde::Serialize`].
202    ///
203    /// # Arguments
204    ///
205    /// * `msg` - The message to serialize
206    ///
207    /// # Returns
208    ///
209    /// * `Result<Vec<u8>, SDKError>` - The result of the operation
210    ///
211    /// # Errors
212    ///
213    /// * [`SDKError::Producer`] - If the producer fails to serialize the message.
214    ///
215    pub fn serialize_message(&self, msg: &EventStream) -> Result<Vec<u8>> {
216        Ok(serde_json::to_vec(msg).map_err(ProducerError::Json)?)
217    }
218}
219
220// endregion: --> KafkaProducer