ds_event_stream_rs_sdk/consumer/mod.rs
1//! Kafka consumer module.
2//!
3//! ## Overview
4//! Wraps [`rdkafka`]'s [`StreamConsumer`] with a small
5//! convenience API that can be initialised from environment variables or
6//! explicit parameters.
7//!
8//! * Environment‑driven `bootstrap.servers` and `group.id`.
9//! * [`ConsumerContext`] implementation that logs rebalances & commits via
10//! [`tracing`].
11//! * `stream()` exposing an async stream of [`BorrowedMessage`].
12//! * Error handling with custom `ConsumerError` type.
13//!
14//! ### Example
15//! ```no_run
16//! use ds_event_stream_rs_sdk::consumer::KafkaConsumer;
17//! use ds_event_stream_rs_sdk::model::topics::Topic;
18//! use ds_event_stream_rs_sdk::error::{Result, SDKError};
19//! use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
20//!
21//! use tokio_stream::StreamExt;
22//! use tracing::{info, error};
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), SDKError> {
26//! let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
27//! let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
28//!
29//! let consumer = KafkaConsumer::default(&bootstrap_servers, &[Topic::DsPipelineJobRequested], "group-id", &credentials)?;
30//! let mut event_stream = consumer.event_stream();
31//!
32//! while let Some(next) = event_stream.next().await {
33//! match next {
34//! Ok(event) => info!("Received event: {:?}", event),
35//! Err(err) => error!("Failed to deserialize event: {}", err),
36//! }
37//! }
38//! Ok(())
39//! }
40//! ```
41pub mod error;
42
43use rdkafka::{
44 client::ClientContext,
45 config::{ClientConfig, RDKafkaLogLevel},
46 consumer::{stream_consumer::StreamConsumer, BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance},
47 message::BorrowedMessage,
48 Message,
49};
50use serde_json::Deserializer;
51use serde_path_to_error::deserialize;
52use tokio_stream::{Stream, StreamExt};
53use tracing::{debug, error, info};
54
55use crate::error::{Result, SDKError};
56use crate::model::topics::Topic;
57use crate::model::v1::EventStream;
58use crate::utils::ClientCredentials;
59
60use error::ConsumerError;
61
62/* --------------------------------------------------------------------- */
63/* Context */
64
65/// Logs rebalance lifecycle & commit callbacks via [`tracing`].
66///
67/// # Arguments
68///
69/// * `c` - The consumer
70/// * `r` - The rebalance
71///
72/// # Returns
73///
74/// * `()` - Always succeeds, logs any errors
75///
76/// # Examples
77///
78/// ```no_run
79/// use ds_event_stream_rs_sdk::consumer::TracingContext;
80/// ```
81///
82/// # Panics
83///
84/// * `()` - Always succeeds, logs any errors
85///
86/// # Errors
87///
88/// * `()` - Always succeeds, logs any errors
89pub struct TracingContext;
90impl ClientContext for TracingContext {}
91impl ConsumerContext for TracingContext {
92 fn pre_rebalance<'a>(&self, _c: &BaseConsumer<TracingContext>, r: &Rebalance<'a>) {
93 debug!(?r, "pre‑rebalance");
94 }
95 fn post_rebalance<'a>(&self, _c: &BaseConsumer<TracingContext>, r: &Rebalance<'a>) {
96 debug!(?r, "post‑rebalance");
97 }
98 fn commit_callback(&self, res: rdkafka::error::KafkaResult<()>, offs: &rdkafka::TopicPartitionList) {
99 match res {
100 Ok(_) => debug!(?offs, "offsets committed"),
101 Err(e) => error!(%e, "offset commit failed"),
102 }
103 }
104}
105
106/* --------------------------------------------------------------------- */
107/* Wrapper */
108
109/// Thin wrapper around [`StreamConsumer`].
110///
111/// # Arguments
112///
113/// * `inner` - The inner consumer
114///
115/// # Returns
116///
117/// * `KafkaConsumer` - The Kafka consumer
118///
119pub struct KafkaConsumer {
120 inner: StreamConsumer<TracingContext>,
121}
122
123impl KafkaConsumer {
124 /* ---- constructors ------------------------------------------------ */
125
126 /// Get the default configuration.
127 ///
128 /// # Arguments
129 ///
130 /// * `bootstrap_servers` - The bootstrap servers to use for the consumer
131 /// * `group_id` - The group id to use for the consumer
132 /// * `credentials` - The credentials to use for authentication
133 ///
134 /// # Returns
135 ///
136 /// * `ClientConfig` - The configured client config
137 ///
138 pub fn get_default_config(
139 bootstrap_servers: &str,
140 group_id: &str,
141 credentials: &ClientCredentials,
142 ) -> ClientConfig {
143 let mut config = ClientConfig::new();
144 config
145 .set("group.id", group_id)
146 .set("bootstrap.servers", bootstrap_servers)
147 .set("session.timeout.ms", "6000")
148 .set("enable.partition.eof", "false")
149 .set("heartbeat.interval.ms", "3000")
150 .set("max.poll.interval.ms", "300000")
151 .set("auto.offset.reset", "earliest")
152 .set("enable.auto.commit", "false")
153 .set("fetch.min.bytes", "1")
154 .set("max.partition.fetch.bytes", "1048576")
155 .set("security.protocol", "SASL_PLAINTEXT")
156 .set("sasl.mechanism", "SCRAM-SHA-512")
157 .set("sasl.username", credentials.username.clone())
158 .set("sasl.password", credentials.password.clone())
159 .set_log_level(RDKafkaLogLevel::Info);
160 config
161 }
162
163 /// Explicit configuration.
164 ///
165 /// # Arguments
166 ///
167 /// * `topics` - The topics to subscribe to
168 /// * `config` - The consumer configuration
169 ///
170 /// # Returns
171 ///
172 /// * `Result<Self, SDKError>` - The result of the operation
173 ///
174 /// # Errors
175 ///
176 /// * [`SDKError::Consumer`] - If the consumer fails to create.
177 ///
178 pub fn new(topics: &[Topic], config: ClientConfig) -> Result<Self> {
179 let inner: StreamConsumer<_> = config
180 .create_with_context(TracingContext)
181 .map_err(ConsumerError::Kafka)?;
182
183 let topic_refs: Vec<&str> = topics.iter().map(|t| t.as_ref()).collect();
184 inner.subscribe(&topic_refs).map_err(ConsumerError::Kafka)?;
185
186 info!(topics = ?topic_refs, "Kafka consumer initialised");
187 Ok(Self { inner })
188 }
189
190 /// Default configuration.
191 ///
192 /// # Arguments
193 ///
194 /// * `topics` - The topics to subscribe to
195 /// * `group_id` - The group id to use for the consumer
196 /// * `credentials` - The credentials to use for authentication
197 ///
198 /// # Returns
199 ///
200 /// * `Result<Self, SDKError>` - The result of the operation
201 ///
202 /// # Errors
203 ///
204 /// * [`SDKError::Consumer`] - If the consumer fails to create.
205 ///
206 pub fn default(
207 bootstrap_servers: &str,
208 topics: &[Topic],
209 group_id: &str,
210 credentials: &ClientCredentials,
211 ) -> Result<Self> {
212 let config = Self::get_default_config(bootstrap_servers, group_id, credentials);
213 let inner: StreamConsumer<_> = config
214 .create_with_context(TracingContext)
215 .map_err(ConsumerError::Kafka)?;
216
217 let topic_refs: Vec<&str> = topics.iter().map(|t| t.as_ref()).collect();
218 inner.subscribe(&topic_refs).map_err(ConsumerError::Kafka)?;
219
220 info!(topics = ?topic_refs, "Kafka consumer initialised");
221 Ok(Self { inner })
222 }
223
224 /// Default configuration with string topics.
225 ///
226 /// # Arguments
227 ///
228 /// * `topics` - The topics to subscribe to
229 /// * `group_id` - The group id to use for the consumer
230 /// * `bootstrap_servers` - The bootstrap servers to use for the consumer
231 /// * `credentials` - The credentials to use for authentication
232 ///
233 /// # Returns
234 ///
235 /// * `Result<Self, SDKError>` - The result of the operation
236 ///
237 /// # Errors
238 ///
239 /// * [`SDKError::Consumer`] - If the consumer fails to create.
240 ///
241 pub fn default_with_strings(
242 bootstrap_servers: &str,
243 topics: &[&str],
244 group_id: &str,
245 credentials: &ClientCredentials,
246 ) -> Result<Self> {
247 let config = Self::get_default_config(bootstrap_servers, group_id, credentials);
248 let inner: StreamConsumer<_> = config
249 .create_with_context(TracingContext)
250 .map_err(ConsumerError::Kafka)?;
251
252 inner.subscribe(topics).map_err(ConsumerError::Kafka)?;
253
254 info!(topics = ?topics, "Kafka consumer initialised");
255 Ok(Self { inner })
256 }
257
258 /* ---- helpers ----------------------------------------------------- */
259
260 /// Async stream of messages (`KafkaResult<BorrowedMessage>`).
261 ///
262 /// # Returns
263 ///
264 /// * `impl Stream<Item = KafkaResult<BorrowedMessage<'_>>> + '_` - The stream of messages
265 pub fn stream(&self) -> impl tokio_stream::Stream<Item = rdkafka::error::KafkaResult<BorrowedMessage<'_>>> + '_ {
266 self.inner.stream()
267 }
268
269 /// Async stream of deserialized EventStream messages.
270 ///
271 /// This method provides a higher-level API that automatically deserializes
272 /// Kafka messages into `EventStream` objects. This eliminates the need for
273 /// manual deserialization in your application code.
274 ///
275 /// # Returns
276 ///
277 /// * `impl Stream<Item = Result<EventStream, SDKError>> + '_` - The stream of deserialized events
278 ///
279 /// # Errors
280 ///
281 /// * [`SDKError::Consumer`] - If the consumer fails to deserialize the message.
282 ///
283 pub fn event_stream(&self) -> impl Stream<Item = Result<EventStream, SDKError>> + '_ {
284 self.inner.stream().map(|kafka_result| match kafka_result {
285 Ok(msg) => self.deserialize_message(&msg),
286 Err(kafka_error) => Err(SDKError::Consumer(ConsumerError::Kafka(kafka_error))),
287 })
288 }
289
290 /// Get the underlying consumer for manual polling.
291 ///
292 /// # Returns
293 ///
294 /// * `&StreamConsumer<TracingContext>` - The underlying consumer
295 pub fn inner(&self) -> &StreamConsumer<TracingContext> {
296 &self.inner
297 }
298
299 /// Commits the offset of a message.
300 ///
301 /// # Arguments
302 ///
303 /// * `msg` - The message to commit
304 /// * `mode` - The commit mode
305 ///
306 /// # Returns
307 ///
308 /// * `()` - Always succeeds, logs any errors
309 pub fn commit_message(&self, msg: &BorrowedMessage<'_>, mode: CommitMode) {
310 match self.inner.commit_message(msg, mode) {
311 Ok(_) => (),
312 Err(e) => {
313 error!("Failed to commit message: {}", e);
314 }
315 }
316 }
317
318 /// Deserializes a Kafka message into an `EventStream<T>`.
319 ///
320 /// * `T` must implement [`serde::Serialize`] and [`serde::de::DeserializeOwned`].
321 ///
322 /// # Arguments
323 ///
324 /// * `msg` - The message to deserialize
325 ///
326 /// # Returns
327 ///
328 /// * `Result<EventStream, SDKError>` - The result of the operation
329 ///
330 /// # Errors
331 ///
332 /// * [`SDKError::Consumer`] - If the consumer fails to deserialize the message.
333 ///
334 pub fn deserialize_message(&self, msg: &BorrowedMessage<'_>) -> Result<EventStream> {
335 let payload = msg
336 .payload()
337 .ok_or_else(|| ConsumerError::InvalidPayload("Empty message payload".to_string()))?;
338 let payload_str = std::str::from_utf8(payload).map_err(ConsumerError::Utf8)?;
339
340 let de = &mut Deserializer::from_str(payload_str);
341 match deserialize::<_, EventStream>(de) {
342 Ok(event) => Ok(event),
343 Err(e) => Err(SDKError::Consumer(ConsumerError::InvalidPayload(format!(
344 "Deserialization error at {}: {}",
345 e.path(),
346 e
347 )))),
348 }
349 }
350}