ds_event_stream_rs_sdk/consumer/
mod.rs

1//! Kafka consumer module.
2//!
3//! ## Overview
4//! Wraps [`rdkafka`]'s [`StreamConsumer`] with a small
5//! convenience API that can be initialised from environment variables or
6//! explicit parameters.
7//!
8//! * Environment‑driven `bootstrap.servers` and `group.id`.
9//! * [`ConsumerContext`] implementation that logs rebalances & commits via
10//!   [`tracing`].
11//! * `stream()` exposing an async stream of [`BorrowedMessage`].
12//! * Error handling with custom `ConsumerError` type.
13//!
14//! ### Example
15//! ```no_run
16//! use ds_event_stream_rs_sdk::consumer::KafkaConsumer;
17//! use ds_event_stream_rs_sdk::model::topics::Topic;
18//! use ds_event_stream_rs_sdk::error::{Result, SDKError};
19//! use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
20//!
21//! use tokio_stream::StreamExt;
22//! use tracing::{info, error};
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), SDKError> {
26//!     let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
27//!     let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
28//!
29//!     let consumer = KafkaConsumer::default(&bootstrap_servers, &[Topic::DsPipelineJobRequested], "group-id", &credentials)?;
30//!     let mut event_stream = consumer.event_stream();
31//!
32//!     while let Some(next) = event_stream.next().await {
33//!         match next {
34//!             Ok(event) => info!("Received event: {:?}", event),
35//!             Err(err) => error!("Failed to deserialize event: {}", err),
36//!         }
37//!     }
38//!     Ok(())
39//! }
40//! ```
41pub mod error;
42
43use rdkafka::{
44    client::ClientContext,
45    config::{ClientConfig, RDKafkaLogLevel},
46    consumer::{stream_consumer::StreamConsumer, BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance},
47    message::BorrowedMessage,
48    Message,
49};
50use serde_json::Deserializer;
51use serde_path_to_error::deserialize;
52use tokio_stream::{Stream, StreamExt};
53use tracing::{debug, error, info};
54
55use crate::error::{Result, SDKError};
56use crate::model::topics::Topic;
57use crate::model::v1::EventStream;
58use crate::utils::ClientCredentials;
59
60use error::ConsumerError;
61
62/* --------------------------------------------------------------------- */
63/* Context                                                               */
64
65/// Logs rebalance lifecycle & commit callbacks via [`tracing`].
66///
67/// # Arguments
68///
69/// * `c` - The consumer
70/// * `r` - The rebalance
71///
72/// # Returns
73///
74/// * `()` - Always succeeds, logs any errors
75///
76/// # Examples
77///
78/// ```no_run
79/// use ds_event_stream_rs_sdk::consumer::TracingContext;
80/// ```
81///
82/// # Panics
83///
84/// * `()` - Always succeeds, logs any errors
85///
86/// # Errors
87///
88/// * `()` - Always succeeds, logs any errors
89pub struct TracingContext;
90impl ClientContext for TracingContext {}
91impl ConsumerContext for TracingContext {
92    fn pre_rebalance<'a>(&self, _c: &BaseConsumer<TracingContext>, r: &Rebalance<'a>) {
93        debug!(?r, "pre‑rebalance");
94    }
95    fn post_rebalance<'a>(&self, _c: &BaseConsumer<TracingContext>, r: &Rebalance<'a>) {
96        debug!(?r, "post‑rebalance");
97    }
98    fn commit_callback(&self, res: rdkafka::error::KafkaResult<()>, offs: &rdkafka::TopicPartitionList) {
99        match res {
100            Ok(_) => debug!(?offs, "offsets committed"),
101            Err(e) => error!(%e, "offset commit failed"),
102        }
103    }
104}
105
106/* --------------------------------------------------------------------- */
107/* Wrapper                                                               */
108
109/// Thin wrapper around [`StreamConsumer`].
110///
111/// # Arguments
112///
113/// * `inner` - The inner consumer
114///
115/// # Returns
116///
117/// * `KafkaConsumer` - The Kafka consumer
118///
119pub struct KafkaConsumer {
120    inner: StreamConsumer<TracingContext>,
121}
122
123impl KafkaConsumer {
124    /* ---- constructors ------------------------------------------------ */
125
126    /// Get the default configuration.
127    ///
128    /// # Arguments
129    ///
130    /// * `bootstrap_servers` - The bootstrap servers to use for the consumer
131    /// * `group_id` - The group id to use for the consumer
132    /// * `credentials` - The credentials to use for authentication
133    ///
134    /// # Returns
135    ///
136    /// * `ClientConfig` - The configured client config
137    ///
138    pub fn get_default_config(
139        bootstrap_servers: &str,
140        group_id: &str,
141        credentials: &ClientCredentials,
142    ) -> ClientConfig {
143        let mut config = ClientConfig::new();
144        config
145            .set("group.id", group_id)
146            .set("bootstrap.servers", bootstrap_servers)
147            .set("session.timeout.ms", "6000")
148            .set("enable.partition.eof", "false")
149            .set("heartbeat.interval.ms", "3000")
150            .set("max.poll.interval.ms", "300000")
151            .set("auto.offset.reset", "earliest")
152            .set("enable.auto.commit", "false")
153            .set("fetch.min.bytes", "1")
154            .set("max.partition.fetch.bytes", "1048576")
155            .set("security.protocol", "SASL_PLAINTEXT")
156            .set("sasl.mechanism", "SCRAM-SHA-512")
157            .set("sasl.username", credentials.username.clone())
158            .set("sasl.password", credentials.password.clone())
159            .set_log_level(RDKafkaLogLevel::Info);
160        config
161    }
162
163    /// Explicit configuration.
164    ///
165    /// # Arguments
166    ///
167    /// * `topics` - The topics to subscribe to
168    /// * `config` - The consumer configuration
169    ///
170    /// # Returns
171    ///
172    /// * `Result<Self, SDKError>` - The result of the operation
173    ///
174    /// # Errors
175    ///
176    /// * [`SDKError::Consumer`] - If the consumer fails to create.
177    ///
178    pub fn new(topics: &[Topic], config: ClientConfig) -> Result<Self> {
179        let inner: StreamConsumer<_> = config
180            .create_with_context(TracingContext)
181            .map_err(ConsumerError::Kafka)?;
182
183        let topic_refs: Vec<&str> = topics.iter().map(|t| t.as_ref()).collect();
184        inner.subscribe(&topic_refs).map_err(ConsumerError::Kafka)?;
185
186        info!(topics = ?topic_refs, "Kafka consumer initialised");
187        Ok(Self { inner })
188    }
189
190    /// Default configuration.
191    ///
192    /// # Arguments
193    ///
194    /// * `topics` - The topics to subscribe to
195    /// * `group_id` - The group id to use for the consumer
196    /// * `credentials` - The credentials to use for authentication
197    ///
198    /// # Returns
199    ///
200    /// * `Result<Self, SDKError>` - The result of the operation
201    ///
202    /// # Errors
203    ///
204    /// * [`SDKError::Consumer`] - If the consumer fails to create.
205    ///
206    pub fn default(
207        bootstrap_servers: &str,
208        topics: &[Topic],
209        group_id: &str,
210        credentials: &ClientCredentials,
211    ) -> Result<Self> {
212        let config = Self::get_default_config(bootstrap_servers, group_id, credentials);
213        let inner: StreamConsumer<_> = config
214            .create_with_context(TracingContext)
215            .map_err(ConsumerError::Kafka)?;
216
217        let topic_refs: Vec<&str> = topics.iter().map(|t| t.as_ref()).collect();
218        inner.subscribe(&topic_refs).map_err(ConsumerError::Kafka)?;
219
220        info!(topics = ?topic_refs, "Kafka consumer initialised");
221        Ok(Self { inner })
222    }
223
224    /// Default configuration with string topics.
225    ///
226    /// # Arguments
227    ///
228    /// * `topics` - The topics to subscribe to
229    /// * `group_id` - The group id to use for the consumer
230    /// * `bootstrap_servers` - The bootstrap servers to use for the consumer
231    /// * `credentials` - The credentials to use for authentication
232    ///
233    /// # Returns
234    ///
235    /// * `Result<Self, SDKError>` - The result of the operation
236    ///
237    /// # Errors
238    ///
239    /// * [`SDKError::Consumer`] - If the consumer fails to create.
240    ///
241    pub fn default_with_strings(
242        bootstrap_servers: &str,
243        topics: &[&str],
244        group_id: &str,
245        credentials: &ClientCredentials,
246    ) -> Result<Self> {
247        let config = Self::get_default_config(bootstrap_servers, group_id, credentials);
248        let inner: StreamConsumer<_> = config
249            .create_with_context(TracingContext)
250            .map_err(ConsumerError::Kafka)?;
251
252        inner.subscribe(topics).map_err(ConsumerError::Kafka)?;
253
254        info!(topics = ?topics, "Kafka consumer initialised");
255        Ok(Self { inner })
256    }
257
258    /* ---- helpers ----------------------------------------------------- */
259
260    /// Async stream of messages (`KafkaResult<BorrowedMessage>`).
261    ///
262    /// # Returns
263    ///
264    /// * `impl Stream<Item = KafkaResult<BorrowedMessage<'_>>> + '_` - The stream of messages
265    pub fn stream(&self) -> impl tokio_stream::Stream<Item = rdkafka::error::KafkaResult<BorrowedMessage<'_>>> + '_ {
266        self.inner.stream()
267    }
268
269    /// Async stream of deserialized EventStream messages.
270    ///
271    /// This method provides a higher-level API that automatically deserializes
272    /// Kafka messages into `EventStream` objects. This eliminates the need for
273    /// manual deserialization in your application code.
274    ///
275    /// # Returns
276    ///
277    /// * `impl Stream<Item = Result<EventStream, SDKError>> + '_` - The stream of deserialized events
278    ///
279    /// # Errors
280    ///
281    /// * [`SDKError::Consumer`] - If the consumer fails to deserialize the message.
282    ///
283    pub fn event_stream(&self) -> impl Stream<Item = Result<EventStream, SDKError>> + '_ {
284        self.inner.stream().map(|kafka_result| match kafka_result {
285            Ok(msg) => self.deserialize_message(&msg),
286            Err(kafka_error) => Err(SDKError::Consumer(ConsumerError::Kafka(kafka_error))),
287        })
288    }
289
290    /// Get the underlying consumer for manual polling.
291    ///
292    /// # Returns
293    ///
294    /// * `&StreamConsumer<TracingContext>` - The underlying consumer
295    pub fn inner(&self) -> &StreamConsumer<TracingContext> {
296        &self.inner
297    }
298
299    /// Commits the offset of a message.
300    ///
301    /// # Arguments
302    ///
303    /// * `msg` - The message to commit
304    /// * `mode` - The commit mode
305    ///
306    /// # Returns
307    ///
308    /// * `()` - Always succeeds, logs any errors
309    pub fn commit_message(&self, msg: &BorrowedMessage<'_>, mode: CommitMode) {
310        match self.inner.commit_message(msg, mode) {
311            Ok(_) => (),
312            Err(e) => {
313                error!("Failed to commit message: {}", e);
314            }
315        }
316    }
317
318    /// Deserializes a Kafka message into an `EventStream<T>`.
319    ///
320    /// * `T` must implement [`serde::Serialize`] and [`serde::de::DeserializeOwned`].
321    ///
322    /// # Arguments
323    ///
324    /// * `msg` - The message to deserialize
325    ///
326    /// # Returns
327    ///
328    /// * `Result<EventStream, SDKError>` - The result of the operation
329    ///
330    /// # Errors
331    ///
332    /// * [`SDKError::Consumer`] - If the consumer fails to deserialize the message.
333    ///
334    pub fn deserialize_message(&self, msg: &BorrowedMessage<'_>) -> Result<EventStream> {
335        let payload = msg
336            .payload()
337            .ok_or_else(|| ConsumerError::InvalidPayload("Empty message payload".to_string()))?;
338        let payload_str = std::str::from_utf8(payload).map_err(ConsumerError::Utf8)?;
339
340        let de = &mut Deserializer::from_str(payload_str);
341        match deserialize::<_, EventStream>(de) {
342            Ok(event) => Ok(event),
343            Err(e) => Err(SDKError::Consumer(ConsumerError::InvalidPayload(format!(
344                "Deserialization error at {}: {}",
345                e.path(),
346                e
347            )))),
348        }
349    }
350}