ds_event_stream_rs_sdk/producer/mod.rs
1//! Kafka producer module.
2//!
3//! ## Overview
4//! A thin, opinionated wrapper around [`rdkafka`]'s [`FutureProducer`]. The
5//! goal is to minimise boiler‑plate when publishing JSON‑serialisable payloads
6//! to the DS Event Stream Kafka cluster while still giving full control over
7//! configuration when needed.
8//!
9//! Features
10//! * Lazy, fallible construction via [`KafkaProducer::default`].
11//! * Emits structured [`tracing`] spans for each send operation.
12//! * Transparently maps errors into your project's [`SDKError`] enum.
13//!
14//! ### Example
15//! ```no_run
16//! use ds_event_stream_rs_sdk::producer::KafkaProducer;
17//! use ds_event_stream_rs_sdk::model::{EventStream, topics::Topic};
18//! use ds_event_stream_rs_sdk::error::Result;
19//! use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
20//! use uuid::Uuid;
21//! use chrono::Utc;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<()> {
25//! let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
26//! let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
27//!
28//! let producer = KafkaProducer::default(&bootstrap_servers, &credentials)?;
29//! let payload = EventStream {
30//! id: Uuid::new_v4(),
31//! session_id: Uuid::new_v4(),
32//! tenant_id: Uuid::new_v4(),
33//! event_source: "test".to_string(),
34//! event_type: "test".to_string(),
35//! timestamp: Utc::now(),
36//! created_by: "test".to_string(),
37//! md5_hash: "test".to_string(),
38//! request_id: None,
39//! owner_id: None,
40//! product_id: None,
41//! product_schema_uri: None,
42//! event_source_uri: None,
43//! affected_entity_uri: None,
44//! message: Some("hello".to_string()),
45//! payload: None,
46//! payload_uri: None,
47//! context: None,
48//! context_uri: None,
49//! metadata: None,
50//! tags: None,
51//! };
52//! producer.send_event(&Topic::DsPipelineJobRequested, "user-42", &payload, None).await?;
53//! Ok(())
54//! }
55//! ```
56pub mod error;
57
58use std::time::Duration;
59
60use rdkafka::{
61 config::ClientConfig,
62 producer::{FutureProducer, FutureRecord},
63};
64use tracing::{error, info};
65
66use crate::error::{Result, SDKError};
67use crate::model::topics::Topic;
68use crate::model::v1::EventStream;
69use crate::utils::ClientCredentials;
70
71use error::ProducerError;
72
73// region: --> KafkaProducer
74
75/// Wrapper around an [`rdkafka::producer::FutureProducer`].
76///
77/// The producer is configured once and can then be cheaply cloned thanks to
78/// the internal `Arc` in *rdkafka*'s handle.
79#[derive(Clone)]
80pub struct KafkaProducer {
81 inner: FutureProducer,
82}
83
84impl KafkaProducer {
85 /// Explicit configuration.
86 ///
87 /// # Arguments
88 ///
89 /// * `config` - The configuration to use for the producer.
90 /// * `username` - The username to use for authentication
91 /// * `password` - The password to use for authentication
92 ///
93 /// # Returns
94 ///
95 /// * `Result<Self, SDKError>` - The result of the operation
96 ///
97 /// # Errors
98 ///
99 /// * [`SDKError::Producer`] - If the producer fails to create.
100 ///
101 pub fn new(config: ClientConfig) -> Result<Self> {
102 let inner: FutureProducer = config.create().map_err(ProducerError::Kafka)?;
103 Ok(Self { inner })
104 }
105
106 /// Default configuration.
107 ///
108 /// # Arguments
109 ///
110 /// * `bootstrap_servers` - The bootstrap servers to use for the producer.
111 /// * `username` - The username to use for authentication
112 /// * `password` - The password to use for authentication
113 ///
114 /// # Returns
115 ///
116 /// * `Result<Self, SDKError>` - The result of the operation
117 ///
118 /// # Errors
119 ///
120 /// * [`SDKError::Producer`] - If the producer fails to create.
121 ///
122 pub fn default(bootstrap_servers: &str, credentials: &ClientCredentials) -> Result<Self> {
123 let inner: FutureProducer = ClientConfig::new()
124 .set("bootstrap.servers", bootstrap_servers)
125 .set("acks", "all")
126 .set("retries", "3")
127 .set("delivery.timeout.ms", "120000")
128 .set("request.timeout.ms", "30000")
129 .set("message.timeout.ms", "5000")
130 .set("compression.type", "snappy")
131 .set("batch.size", "16384")
132 .set("linger.ms", "5")
133 .set("max.in.flight.requests.per.connection", "5")
134 .set("security.protocol", "SASL_PLAINTEXT")
135 .set("sasl.mechanisms", "SCRAM-SHA-512")
136 .set("sasl.username", credentials.username.clone())
137 .set("sasl.password", credentials.password.clone())
138 .create()
139 .map_err(ProducerError::Kafka)?;
140
141 info!(servers = %bootstrap_servers, "Kafka producer initialised");
142 Ok(Self { inner })
143 }
144
145 /// Sends a key‑ed JSON message to **`topic`**.
146 ///
147 /// * `payload` must be an [`EventStream`] object that implements [`serde::Serialize`].
148 /// * `key` is used for partitioning; choose a deterministic key for *exactly
149 /// once‑per‑key* semantics.
150 /// * `queue_timeout` is optional; defaults to 5000ms if not provided.
151 ///
152 /// The function is instrumented with [`tracing`]; any error bubbles up as
153 ///
154 /// Arguments
155 ///
156 /// * `topic` - The topic to send the message to
157 /// * `key` - The key to send the message to
158 /// * `payload` - The payload to send the message to
159 /// * `queue_timeout` - The timeout to send the message to
160 ///
161 /// Returns
162 ///
163 /// * `Result<()>` - The result of the operation
164 ///
165 /// Errors
166 ///
167 /// * [`SDKError::Producer`] - If the producer fails to send the message.
168 ///
169 pub async fn send_event(
170 &self,
171 topic: &Topic,
172 key: &str,
173 payload: &EventStream,
174 queue_timeout: Option<Duration>,
175 ) -> Result<()> {
176 let topic_name = topic.to_string();
177 let payload_json = self.serialize_message(payload)?;
178
179 let record = FutureRecord::to(&topic_name).payload(&payload_json).key(key);
180 let timeout = queue_timeout.unwrap_or(Duration::from_millis(5000));
181
182 match self.inner.send(record, timeout).await {
183 Ok(delivery) => {
184 info!(
185 partition = delivery.partition,
186 offset = delivery.offset,
187 "message produced to topic: {}",
188 topic_name
189 );
190 Ok(())
191 }
192 Err((err, _msg)) => {
193 error!(error = %err, "failed to produce message to topic: {}", topic_name);
194 Err(SDKError::Producer(ProducerError::Kafka(err)))
195 }
196 }
197 }
198
199 /// Serializes an `EventStream<T>` into a Kafka message.
200 ///
201 /// * `T` must implement [`serde::Serialize`].
202 ///
203 /// # Arguments
204 ///
205 /// * `msg` - The message to serialize
206 ///
207 /// # Returns
208 ///
209 /// * `Result<Vec<u8>, SDKError>` - The result of the operation
210 ///
211 /// # Errors
212 ///
213 /// * [`SDKError::Producer`] - If the producer fails to serialize the message.
214 ///
215 pub fn serialize_message(&self, msg: &EventStream) -> Result<Vec<u8>> {
216 Ok(serde_json::to_vec(msg).map_err(ProducerError::Json)?)
217 }
218}
219
220// endregion: --> KafkaProducer