X For You algorithm, line by line · Part 3
X For You algorithm, line by line — Part 3: Thunder II (Kafka + gRPC)
Part 3 of the deep dive into xai-org/x-algorithm. Thunder's two Kafka listeners (legacy Thrift transformer + v2 proto sink), the catchup signal, and the InNetworkPostsService gRPC handler — semaphore-bounded backpressure, two-stage statistics, spawn_blocking for in-memory lookups.
This finishes Thunder. We've already covered (Session 02) how the binary starts up and how the PostStore works as an in-memory shard. Now we read the two halves that actually use it:
- The Kafka listeners — the consumer loops that read post events off Kafka, deserialize them, and write into the store.
- The gRPC service — what feed clients (
home-mixer) actually call to fetch in-network candidates.
Files covered (1,029 LOC total):
thunder/
├── kafka/
│ ├── mod.rs (3) re-exports
│ ├── utils.rs (48) shared deserialization helpers
│ ├── tweet_events_listener.rs (390) v1: Thrift TweetEvent → produce v2 events
│ └── tweet_events_listener_v2.rs (249) v2: InNetworkEvent → PostStore
└── thunder_service.rs (339) InNetworkPostsService (gRPC)
There are two listener variants because Thunder is mid-migration between a legacy Thrift format (TweetEvent) and a new proto format (InNetworkEvent). Non-serving Thunder runs v1 (transformer mode: Thrift in, proto out). Serving Thunder runs v2 (proto in, into the store). The split was visible in kafka_utils.rs from Session 02.
kafka/mod.rs (3 lines)
pub mod tweet_events_listener;
pub mod tweet_events_listener_v2;
pub mod utils;
Submodule declarations. Nothing else.
kafka/utils.rs (48 lines)
Two helpers used by both listener variants: build a Kafka consumer, and deserialize a batch of KafkaMessages into typed structs.
use anyhow::{Context, Result};
use std::sync::Arc;
use tokio::sync::RwLock;
use xai_kafka::{KafkaMessage, config::KafkaConsumerConfig, consumer::KafkaConsumer};
use crate::metrics;
Imports:
anyhow::{Context, Result}— error type with.context("msg")for attaching string context.Arc<RwLock<_>>— the consumer is wrapped inArc<RwLock<...>>because multiple tasks (the polling loop + the lag-monitor task) need to access it.xai_kafka::KafkaConsumer— the internal Kafka consumer client. We never see its implementation; it exposesstart(),poll(N),commit_offsets(),get_partition_lags().crate::metrics— Prometheus metric statics (onlyBATCH_PROCESSING_TIME,KAFKA_MESSAGES_FAILED_PARSEare referenced).
/// Create and start a Kafka consumer with the given configuration
pub async fn create_kafka_consumer(
config: KafkaConsumerConfig,
) -> Result<Arc<RwLock<KafkaConsumer>>> {
let mut consumer = KafkaConsumer::new(config);
consumer
.start()
.await
.context("Failed to start Kafka consumer")?;
Ok(Arc::new(RwLock::new(consumer)))
}
Three lines of substance:
- Build the consumer from config (cheap — no network yet).
start()— opens connections, joins consumer group, gets partition assignment.- Wrap in
Arc<RwLock<…>>and return.
The start() is the only fallible step; if it fails (broker unreachable, auth bad), .context("Failed to start Kafka consumer") attaches a string to the error so the caller knows what happened.
/// Process a batch of Kafka messages and deserialize them using the provided deserializer function
pub fn deserialize_kafka_messages<T, F>(
messages: Vec<KafkaMessage>,
deserializer: F,
) -> Result<Vec<T>>
where
F: Fn(&[u8]) -> Result<T>,
{
let _timer = metrics::Timer::new(metrics::BATCH_PROCESSING_TIME.clone());
let mut kafka_data = Vec::with_capacity(messages.len());
for msg in messages.iter() {
if let Some(payload) = &msg.payload {
match deserializer(payload) {
Ok(deserialized_msg) => {
kafka_data.push(deserialized_msg);
}
Err(e) => {
log::error!("Failed to parse Kafka message: {}", e);
metrics::KAFKA_MESSAGES_FAILED_PARSE.inc();
}
}
}
}
Ok(kafka_data)
}
Generic batch deserializer. T is the target struct, F is a function from &[u8] to Result<T>. Both v1 and v2 listeners pass their respective deserializers (deserialize_tweet_event for v1, deserialize_tweet_event_v2 for v2).
Walking the body:
let _timer = metrics::Timer::new(metrics::BATCH_PROCESSING_TIME.clone());— RAII-style timer. TheTimerstruct (defined inmetrics, not shown) starts onnew()and observes elapsed time ondrop. The_timerbinding keeps it alive until the function returns.BATCH_PROCESSING_TIME.clone()clones the Prometheus histogram handle (cheap, it's anArc<…>underneath).Vec::with_capacity(messages.len())— preallocates. Avoids reallocations as we push.- For each message:
- If the message has a payload (
msg.payloadisOption<Vec<u8>>— Kafka allows null-payload messages, e.g. tombstones), call the deserializer. - On success, push to the output vec.
- On failure, log the error and increment
KAFKA_MESSAGES_FAILED_PARSE. Critically: do not abort the whole batch. A single corrupt message shouldn't take down ingestion.
- If the message has a payload (
- Return the deserialized vec.
The return type is Result<Vec<T>> even though the function never returns Err. Probably an artifact of an older version, or future-proofing for a fatal-error case.
End of file.
kafka/tweet_events_listener.rs (390 lines) — v1: legacy Thrift → proto
This listener runs in non-serving mode only. It reads the legacy TweetEvent (Thrift) topic, transforms each event into the new InNetworkEvent (proto), and produces it to the new topic. It's the migration shim.
We'll walk it top to bottom.
use anyhow::{Context, Result};
use log::{error, info, warn};
use prost::Message;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::sync::RwLock;
use xai_kafka::{KafkaMessage, config::KafkaConsumerConfig, consumer::KafkaConsumer};
use xai_kafka::{KafkaProducer, KafkaProducerConfig};
use xai_thunder_proto::{
InNetworkEvent, LightPost, TweetCreateEvent, TweetDeleteEvent, in_network_event,
};
use crate::{
args::Args,
crate::config::MIN_VIDEO_DURATION_MS,
deserializer::deserialize_tweet_event,
kafka::utils::{create_kafka_consumer, deserialize_kafka_messages},
metrics,
schema::{tweet::Tweet, tweet_events::TweetEventData},
};
Imports:
prost::Message— the proto serialization trait. Brought in forevent.encode_to_vec().AtomicUsize+Ordering::Relaxed— for the cross-thread log counter.KafkaProducer/KafkaProducerConfig— this listener also produces (it's the transformer).xai_thunder_proto::{InNetworkEvent, LightPost, TweetCreateEvent, TweetDeleteEvent, in_network_event}— the v2 proto types.in_network_eventis the prost-generated module that holds theEventVariantenum.crate::config::MIN_VIDEO_DURATION_MS— the floor below which a video doesn't count as "video content."deserialize_tweet_event— the Thrift v1 decoder.crate::schema::{tweet::Tweet, tweet_events::TweetEventData}— the Thrift types for input.
Side note: crate::config::MIN_VIDEO_DURATION_MS should be config::MIN_VIDEO_DURATION_MS — the leading crate:: is doubled (crate:: outside the brace + crate:: inside). This is almost certainly an open-source release artifact; the real source code probably says config::MIN_VIDEO_DURATION_MS. Verbatim build will fail.
/// Counter for logging batch processing every Nth time
static BATCH_LOG_COUNTER: AtomicUsize = AtomicUsize::new(0);
Module-level AtomicUsize. Used to throttle the "milestone" log line so we don't log every batch. We'll increment + check below.
/// Monitor Kafka partition lag and update metrics
async fn monitor_partition_lag(
consumer: Arc<RwLock<KafkaConsumer>>,
topic: String,
interval_secs: u64,
) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let consumer = consumer.read().await;
match consumer.get_partition_lags().await {
Ok(lag_info) => {
for partition_lag in lag_info {
let partition_str = partition_lag.partition_id.to_string();
metrics::KAFKA_PARTITION_LAG
.with_label_values(&[&topic, &partition_str])
.set(partition_lag.lag as f64);
}
}
Err(e) => {
warn!("Failed to get partition lag info: {}", e);
}
}
}
}
A periodic loop that reports per-partition consumer lag to Prometheus.
Mechanics:
tokio::time::interval(Duration::from_secs(interval_secs))— fires everyinterval_secs.interval.tick().await— sleeps until the next firing.consumer.read().await—RwLockread guard. Multiple readers allowed; mutex held untilconsumerdrops at end of iteration.consumer.get_partition_lags().await— RPC to Kafka asking "for the partitions I'm subscribed to, how far behind am I?".- For each partition: emit a gauge with labels
[topic, partition]. So Grafana queries likekafka_partition_lag{topic="tweet_events", partition="3"}work.
This runs forever. The caller spawns it into a Tokio task and never joins.
fn is_eligible_video(tweet: &Tweet) -> bool {
let Some(media) = tweet.media.as_ref() else {
return false;
};
let [first_media] = media.as_slice() else {
return false;
};
let Some(crate::schema::tweet_media::MediaInfo::VideoInfo(video_info)) =
first_media.media_info.as_ref()
else {
return false;
};
video_info
.duration_millis
.map(|d| d >= MIN_VIDEO_DURATION_MS)
.unwrap_or(false)
}
A predicate: "is this tweet a video-eligible tweet for the video-only feed?"
The logic, step by step:
tweet.media.as_ref()— the tweet has amedia: Option<Vec<TweetMedia>>field.as_ref()borrows.let Some(media) = ... else { return false; }— Rust's let-else syntax. IfNone, return false.let [first_media] = media.as_slice()— a pattern match that succeeds only if the slice has exactly one element. So a tweet with two media items doesn't count as a "video tweet." Why? Because the rest of the system stores only single-video posts; mixed media (e.g., one image + one video) doesn't fit the simple "video feed" abstraction.let Some(MediaInfo::VideoInfo(video_info)) = first_media.media_info.as_ref()— pattern match the variant: must beVideoInfo, notImageInfoorAnimatedGifInfo.- Check
video_info.duration_millis >= MIN_VIDEO_DURATION_MS(e.g., ≥ 3 seconds — the constant is opaque from here).
The fully-qualified crate::schema::tweet_media::MediaInfo::VideoInfo(...) path is unusual style; usually you'd use crate::schema::tweet_media::MediaInfo; and write MediaInfo::VideoInfo(...). Probably a leftover artifact from auto-generated code.
/// Start the partition lag monitoring task in the background
pub fn start_partition_lag_monitor(
consumer: Arc<RwLock<KafkaConsumer>>,
topic: String,
interval_secs: u64,
) {
tokio::spawn(async move {
info!(
"Starting partition lag monitoring task for topic '{}' (interval: {}s)",
topic, interval_secs
);
monitor_partition_lag(consumer, topic, interval_secs).await;
});
}
Wrapper that spawns monitor_partition_lag into a Tokio task. pub because the v2 listener calls this too (see below — it imports crate::kafka::tweet_events_listener::start_partition_lag_monitor).
The async move { … } captures all three params by move into the spawned future.
/// Start the tweet event processing loop in the background with configurable number of threads
pub async fn start_tweet_event_processing(
base_config: KafkaConsumerConfig,
producer_config: KafkaProducerConfig,
args: &Args,
) {
let num_partitions = args.tweet_events_num_partitions as usize;
let kafka_num_threads = args.kafka_num_threads;
// Use all available partitions
let partitions_to_use: Vec<i32> = (0..num_partitions as i32).collect();
let partitions_per_thread = num_partitions.div_ceil(kafka_num_threads);
info!(
"Starting {} message processing threads for {} partitions ({} partitions per thread)",
kafka_num_threads, num_partitions, partitions_per_thread
);
The v1 entry point. Logs and computes partition distribution:
num_partitions— total partitions on the legacy Kafka topic.kafka_num_threads— how many threads we want for processing.partitions_to_use—[0, 1, 2, …, num_partitions - 1]. We use all partitions (no shard skipping).partitions_per_thread— ceiling division. So ifnum_partitions=10, kafka_num_threads=3, you get 4 partitions per thread (3×4=12 ≥ 10).
div_ceil is from the std usize::div_ceil method.
let producer = if !args.is_serving {
info!("Kafka producer enabled, starting producer...");
let producer = Arc::new(RwLock::new(KafkaProducer::new(producer_config)));
if let Err(e) = producer.write().await.start().await {
panic!("Failed to start Kafka producer: {:#}", e);
}
Some(producer)
} else {
info!("Kafka producer disabled, skipping producer initialization");
None
};
spawn_processing_threads(base_config, partitions_to_use, producer, args);
}
Producer is conditional. Only the non-serving (transformer) role needs a producer — but since this whole function only runs in non-serving mode (if !args.is_serving block in kafka_utils.rs), the if !args.is_serving here is actually always true. It's defensive duplication.
panic! on producer start failure: there's no graceful degradation — without the producer, the transformer can't do its job, so crash and let the supervisor restart.
{:#} formats the error with the alternate (multi-line) form, which anyhow uses to show the full error chain.
Then delegate to spawn_processing_threads.
/// Spawn multiple processing threads, each handling a subset of partitions
fn spawn_processing_threads(
base_config: KafkaConsumerConfig,
partitions_to_use: Vec<i32>,
producer: Option<Arc<RwLock<KafkaProducer>>>,
args: &Args,
) {
let total_partitions = partitions_to_use.len();
let partitions_per_thread = total_partitions.div_ceil(args.kafka_num_threads);
for thread_id in 0..args.kafka_num_threads {
let start_idx = thread_id * partitions_per_thread;
let end_idx = ((thread_id + 1) * partitions_per_thread).min(total_partitions);
if start_idx >= total_partitions {
break;
}
Thread-per-partition-bucket loop.
For each thread 0..kafka_num_threads, compute the slice [start_idx, end_idx) of partitions_to_use for this thread.
if start_idx >= total_partitions { break; } — handles the case where we have more threads than partitions. E.g., 3 partitions / 4 threads = partitions_per_thread = 1. Then thread 0 gets [0..1), thread 1 gets [1..2), thread 2 gets [2..3), thread 3 would get [3..3). But before that, start_idx=3 >= total_partitions=3 triggers break. So no empty thread spawned.
let thread_partitions = partitions_to_use[start_idx..end_idx].to_vec();
let mut thread_config = base_config.clone();
thread_config.partitions = Some(thread_partitions.clone());
let producer_clone = producer.as_ref().map(Arc::clone);
let topic = thread_config.base_config.topic.clone();
let lag_monitor_interval_secs = args.lag_monitor_interval_secs;
let batch_size = args.kafka_batch_size;
let post_retention_sec = args.post_retention_seconds;
Prepare per-thread state. Clone the base config and assign this thread's partitions. Clone the producer Arc (if any). Pull primitives out of args so the closure below doesn't borrow args — borrows can't cross tokio::spawn boundaries unless 'static, and primitives moved by value are always 'static.
tokio::spawn(async move {
info!(
"Starting message processing thread {} for partitions {:?}",
thread_id, thread_partitions
);
match create_kafka_consumer(thread_config).await {
Ok(consumer) => {
// Start partition lag monitoring for this thread's partitions
start_partition_lag_monitor(
Arc::clone(&consumer),
topic,
lag_monitor_interval_secs,
);
if let Err(e) = process_tweet_events(
consumer,
batch_size,
producer_clone,
post_retention_sec as i64,
)
.await
{
panic!(
"Tweet events processing thread {} exited unexpectedly: {:#}. This is a critical failure - the feeder cannot function without tweet event processing.",
thread_id, e
);
}
}
Err(e) => {
panic!(
"Failed to create consumer for thread {}: {:#}",
thread_id, e
);
}
}
});
}
}
Each spawned task:
- Creates the consumer (panic if that fails).
- Spawns the lag monitor as its own sub-task.
- Enters
process_tweet_events(the main loop). If that ever returnsErr, panic — it's not supposed to.
The doubled panic vs. the single function: process_tweet_events is an infinite loop. The only way it returns an error is if something catastrophic happens (commit fails, etc.). We exit hard so Kubernetes notices.
/// Process a batch of messages: deserialize, extract posts, and store them
async fn process_message_batch(
messages: Vec<KafkaMessage>,
batch_num: usize,
producer: Option<Arc<RwLock<KafkaProducer>>>,
post_retention_sec: i64,
) -> Result<()> {
let results = deserialize_kafka_messages(messages, deserialize_tweet_event)?;
let mut create_tweets = Vec::new();
let mut delete_tweets = Vec::new();
let mut first_post_id = 0;
let mut first_user_id = 0;
let len_posts = results.len();
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
The transformer's per-batch core. Takes a vec of raw KafkaMessages (the v1 Thrift payload), deserializes them, classifies them, and emits v2 events.
Set up:
results— the deserializedTweetEvents. Anything that failed to deserialize was logged and dropped inside the helper.create_tweets/delete_tweets— accumulators for the two event categories.first_post_id/first_user_id— for the "every-1000th-batch" log line that follows.now_secs— unix seconds for retention checks.
for tweet_event in results {
let data = tweet_event.data.unwrap();
match data {
TweetEventData::TweetCreateEvent(create_event) => {
first_post_id = create_event.tweet.as_ref().unwrap().id.unwrap();
first_user_id = create_event.user.as_ref().unwrap().id.unwrap();
let tweet = create_event.tweet.as_ref().unwrap();
let core_data = tweet.core_data.as_ref().unwrap();
if let Some(nullcast) = core_data.nullcast
&& nullcast
{
continue;
}
Classify each event. Thrift makes everything Optional (it's the closest analog to "field may be absent"), hence the .unwrap() chain — these are defensive assertions that the publisher is well-formed. In a production setting, an .unwrap() on a missing required field will panic; this is fine because Kafka events are first-party traffic with strict producer-side validation.
The nullcast check is interesting. "Nullcast" is X's term for a tweet that's not supposed to appear in feeds — typically used for ad-only tweets, where the tweet exists as a content carrier for an ad but shouldn't surface organically. Both nullcast (the outer Option) and the inner bool need to be true to skip.
The if let Some(nullcast) = core_data.nullcast && nullcast is let chains — stabilized in newer Rust. Equivalent to if core_data.nullcast == Some(true).
create_tweets.push(LightPost {
post_id: tweet.id.unwrap(),
author_id: create_event.user.as_ref().unwrap().id.unwrap(),
created_at: core_data.created_at_secs.unwrap(),
in_reply_to_post_id: core_data
.reply
.as_ref()
.and_then(|r| r.in_reply_to_status_id),
in_reply_to_user_id: core_data
.reply
.as_ref()
.and_then(|r| r.in_reply_to_user_id),
is_retweet: core_data.share.is_some(),
is_reply: core_data.reply.is_some(),
source_post_id: core_data.share.as_ref().and_then(|s| s.source_status_id),
source_user_id: core_data.share.as_ref().and_then(|s| s.source_user_id),
has_video: is_eligible_video(tweet),
conversation_id: core_data.conversation_id,
});
Project the Thrift Tweet into the proto LightPost. Note the mapping:
is_retweet: core_data.share.is_some()— a "share" struct existing on the Thrift side ≡ "this is a retweet."is_reply: core_data.reply.is_some()— a "reply" struct existing ≡ "this is a reply."source_post_id/source_user_id— only meaningful for retweets; pull from the share struct.in_reply_to_post_id/in_reply_to_user_id— only meaningful for replies; pull from the reply struct.has_video: is_eligible_video(tweet)— the predicate we read earlier.conversation_id: core_data.conversation_id— the root-of-thread ID, copied through.
So LightPost is the canonical-form representation that all downstream code uses. The Thrift Tweet is just an input format.
TweetEventData::TweetDeleteEvent(delete_event) => {
let created_at_secs = delete_event
.tweet
.as_ref()
.unwrap()
.core_data
.as_ref()
.unwrap()
.created_at_secs
.unwrap();
if now_secs - created_at_secs > post_retention_sec {
continue;
}
delete_tweets.push(delete_event.tweet.as_ref().unwrap().id.unwrap());
}
Delete event handling.
Interesting filter: if now_secs - created_at_secs > post_retention_sec { continue; }. If the deleted post is older than retention, we don't bother emitting a delete event — Thunder's store has already discarded the post via retention trimming, so the tombstone is irrelevant. Saves a v2 event for every delete of an old post.
delete_tweets is Vec<i64> — just IDs, not full delete events. We'll wrap them in TweetDeleteEvent protos below.
TweetEventData::QuotedTweetDeleteEvent(delete_event) => {
delete_tweets.push(delete_event.quoting_tweet_id.unwrap());
}
_ => {
log::info!("Other non post creation/deletion event")
}
}
}
Two more event types:
- QuotedTweetDeleteEvent: when a quoted tweet is deleted, the quoting tweet itself becomes broken (it points to nothing). Treat the quoting tweet (which is
quoting_tweet_id) as deleted. The originalquotedtweet's deletion has its ownTweetDeleteEvent. - The catch-all
_arm just logs.
This info! log catches the long tail of event types we don't care about (favorites, follows, etc.). The log volume could be high — but presumably we deploy with a filtering log config in prod.
Now the producer half:
// Send each LightPost as an InNetworkEvent to the producer in separate tasks (only if producer is enabled)
if let Some(ref producer) = producer {
let mut send_tasks = Vec::with_capacity(create_tweets.len());
for light_post in &create_tweets {
let event = InNetworkEvent {
event_variant: Some(in_network_event::EventVariant::TweetCreateEvent(
TweetCreateEvent {
post_id: light_post.post_id,
author_id: light_post.author_id,
created_at: light_post.created_at,
in_reply_to_post_id: light_post.in_reply_to_post_id,
in_reply_to_user_id: light_post.in_reply_to_user_id,
is_retweet: light_post.is_retweet,
is_reply: light_post.is_reply,
source_post_id: light_post.source_post_id,
source_user_id: light_post.source_user_id,
has_video: light_post.has_video,
conversation_id: light_post.conversation_id,
},
)),
};
let payload = event.encode_to_vec();
let producer_clone = Arc::clone(producer);
send_tasks.push(tokio::spawn(async move {
let producer_lock = producer_clone.read().await;
if let Err(e) = producer_lock.send(&payload).await {
warn!("Failed to send InNetworkEvent to producer: {:#}", e);
}
}));
}
For each LightPost, build a TweetCreateEvent proto, wrap it in the InNetworkEvent envelope (event_variant is a proto oneof), encode to bytes (encode_to_vec() is from prost::Message), spawn a task that:
- Takes a read lock on the shared producer (
Arc<RwLock<…>>) - Calls
producer.send(&payload).await - Logs on failure
Tasks are collected so we can join them below.
Why spawn one task per post (instead of looping serially)? Because Kafka producers can batch/pipeline internally. By submitting all the sends concurrently, the producer client can pipeline writes for higher throughput. Tasks are cheap.
for post_id in &delete_tweets {
let event = InNetworkEvent {
event_variant: Some(in_network_event::EventVariant::TweetDeleteEvent(
TweetDeleteEvent {
post_id: *post_id,
deleted_at: now_secs,
},
)),
};
let payload = event.encode_to_vec();
let producer_clone = Arc::clone(producer);
send_tasks.push(tokio::spawn(async move {
let producer_lock = producer_clone.read().await;
if let Err(e) = producer_lock.send(&payload).await {
warn!("Failed to send InNetworkEvent to producer: {:#}", e);
}
}));
}
Same loop but for deletes. deleted_at is set to now, not the original deletion timestamp from the source event. Reasonable — what matters is the order in which Thunder sees deletes.
// Wait for all send tasks to complete
for task in send_tasks {
if let Err(e) = task.await {
error!("Error writing to kafka {}", e);
}
}
}
Join all spawned send tasks. task.await on a Tokio JoinHandle returns Result<T, JoinError> — the error here is a join error (the task panicked or was cancelled), not a send error. Send errors were logged inside each task.
// Log every 100th batch
let batch_count = BATCH_LOG_COUNTER.fetch_add(1, Ordering::Relaxed);
if batch_count.is_multiple_of(1000) {
info!(
"Batch processing milestone: processed {} batches total, latest batch {} had {} posts (first: post_id={}, user_id={})",
batch_count + 1,
batch_num,
len_posts,
first_post_id,
first_user_id
);
}
Ok(())
}
Comment says "100th" but the code does is_multiple_of(1000). The comment lags reality. Throttled milestone log every 1,000 batches. Ordering::Relaxed is fine — we don't need cross-thread ordering guarantees for a counter that's just used as a log throttle.
/// Main message processing loop that polls Kafka, batches messages, and stores posts
async fn process_tweet_events(
consumer: Arc<RwLock<KafkaConsumer>>,
batch_size: usize,
producer: Option<Arc<RwLock<KafkaProducer>>>,
post_retention_sec: i64,
) -> Result<()> {
let mut message_buffer = Vec::new();
let mut batch_num = 0;
loop {
let poll_result = {
let mut consumer_lock = consumer.write().await;
consumer_lock.poll(100).await
};
The polling loop. The doc-comment is wrong — this is v1, which doesn't store posts. It transforms and produces.
consumer.write().await — write lock because poll mutates the consumer's offset state. Inside { … } block so the lock drops before we continue (otherwise the next poll iteration would block on itself).
The 100 is the per-poll batch size. We accumulate into message_buffer until we have ≥ batch_size, then process.
match poll_result {
Ok(messages) => {
message_buffer.extend(messages);
// Process batch when we have enough messages
if message_buffer.len() >= batch_size {
batch_num += 1;
let messages = std::mem::take(&mut message_buffer);
let producer_clone = producer.clone();
// Spawn batch processing in a blocking task
process_message_batch(messages, batch_num, producer_clone, post_retention_sec)
.await
.context("Error processing tweet event batch")?;
consumer.write().await.commit_offsets()?;
}
}
Err(e) => {
warn!("Error polling messages: {:#}", e);
metrics::KAFKA_POLL_ERRORS.inc();
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
Walking the success path:
messages.extend(...)— append polled messages to buffer.- Check threshold. If we have enough, process.
std::mem::take(&mut message_buffer)— atomic swap with an empty vec. The owned vec moves intoprocess_message_batch;message_bufferis left empty for the next round.process_message_batch(...).await.context(...)?— call the worker. The?propagates errors up — and we've seen what happens above: the surrounding spawn panics if this returnsErr.consumer.write().await.commit_offsets()?— manual commit. We commit only after the batch succeeds. This is at-least-once semantics: if the process crashes mid-batch, on restart we re-read those messages and may produce duplicate v2 events. Downstream consumers (the v2 listener) already dedupe by post ID, so duplicates are tolerable.
Comment "Spawn batch processing in a blocking task" is stale — there's no spawn_blocking here. Process is inline, awaiting the future. Likely a leftover from an older implementation.
Error path: log + increment counter + sleep 100ms. Don't crash — Kafka may have a transient hiccup; retry.
End of v1 file. In summary: v1 is a per-batch transformer that reads Thrift events, projects them to LightPost shape, and emits v2 protos. It's the migration on-ramp.
kafka/tweet_events_listener_v2.rs (249 lines) — v2: proto → PostStore
The v2 listener runs in serving mode. It reads the proto topic (which v1 transforms into) and writes posts directly to the in-memory PostStore. No producer — this is a one-way sink.
use anyhow::Result;
use log::{info, warn};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Semaphore};
use xai_kafka::{KafkaMessage, config::KafkaConsumerConfig, consumer::KafkaConsumer};
use xai_thunder_proto::{LightPost, TweetDeleteEvent, in_network_event};
use crate::{
args::Args,
deserializer::deserialize_tweet_event_v2,
kafka::utils::{create_kafka_consumer, deserialize_kafka_messages},
metrics,
posts::post_store::PostStore,
};
/// Counter for logging deserialization every Nth time
static DESER_LOG_COUNTER: AtomicUsize = AtomicUsize::new(0);
Same shape of imports. New: Semaphore, Instant, PostStore. No KafkaProducer, no Thrift schemas, no LightPost field constructors — we already get them shape-correct from the proto.
The DESER_LOG_COUNTER is the same throttle pattern as before.
/// Start the tweet event processing loop in the background with configurable number of threads
pub async fn start_tweet_event_processing_v2(
base_config: KafkaConsumerConfig,
post_store: Arc<PostStore>,
args: &Args,
tx: tokio::sync::mpsc::Sender<i64>,
) {
let num_partitions = args.kafka_tweet_events_v2_num_partitions;
let kafka_num_threads = args.kafka_num_threads;
// Use all available partitions
let partitions_to_use: Vec<i32> = (0..num_partitions as i32).collect();
let partitions_per_thread = num_partitions.div_ceil(kafka_num_threads);
info!(
"Starting {} message processing threads for {} partitions ({} partitions per thread)",
kafka_num_threads, num_partitions, partitions_per_thread
);
spawn_processing_threads_v2(base_config, partitions_to_use, post_store, args, tx);
}
The v2 entry. Same partition-distribution math as v1.
The new parameter is tx: tokio::sync::mpsc::Sender<i64> — the catchup signal. From Session 02 we saw main.rs does:
let (tx, mut rx) = tokio::sync::mpsc::channel::<i64>(args.kafka_num_threads);
kafka_utils::start_kafka(&args, post_store.clone(), "", tx).await?;
if args.is_serving {
for _ in 0..args.kafka_num_threads {
rx.recv().await;
}
info!("Kafka init took {:?}", start.elapsed());
…
}
Each thread sends one message on tx once it's caught up to the head of its partitions. main.rs waits for all of them before flipping readiness.
/// Spawn multiple processing threads, each handling a subset of partitions
fn spawn_processing_threads_v2(
base_config: KafkaConsumerConfig,
partitions_to_use: Vec<i32>,
post_store: Arc<PostStore>,
args: &Args,
tx: tokio::sync::mpsc::Sender<i64>,
) {
let total_partitions = partitions_to_use.len();
let partitions_per_thread = total_partitions.div_ceil(args.kafka_num_threads);
// Create shared semaphore to prevent too many tweet_events partition updates at the same time
let semaphore = Arc::new(Semaphore::new(3));
The thread-spawner. Same partition-bucket math. The new thing: a shared Semaphore with 3 permits.
What's it for? "Prevent too many tweet_events partition updates at the same time." After initial catchup, when we're caught up, the processing rate is limited to 3 batches in flight across all threads. This leaves CPU for serving gRPC requests — the system's primary workload after catchup. Without the semaphore, post-ingest CPU contention could drag tail latency on the feed.
Note the choice: only after catchup do we throttle. During catchup we want to drink the firehose; the semaphore is bypassed (see below).
for thread_id in 0..args.kafka_num_threads {
let start_idx = thread_id * partitions_per_thread;
let end_idx = ((thread_id + 1) * partitions_per_thread).min(total_partitions);
if start_idx >= total_partitions {
break;
}
let thread_partitions = partitions_to_use[start_idx..end_idx].to_vec();
let mut thread_config = base_config.clone();
thread_config.partitions = Some(thread_partitions.clone());
let post_store_clone = Arc::clone(&post_store);
let topic = thread_config.base_config.topic.clone();
let lag_monitor_interval_secs = args.lag_monitor_interval_secs;
let batch_size = args.kafka_batch_size;
let tx_clone = tx.clone();
let semaphore_clone = Arc::clone(&semaphore);
tokio::spawn(async move {
info!(
"Starting message processing thread {} for partitions {:?}",
thread_id, thread_partitions
);
match create_kafka_consumer(thread_config).await {
Ok(consumer) => {
// Start partition lag monitoring for this thread's partitions
crate::kafka::tweet_events_listener::start_partition_lag_monitor(
Arc::clone(&consumer),
topic,
lag_monitor_interval_secs,
);
if let Err(e) = process_tweet_events_v2(
consumer,
post_store_clone,
batch_size,
tx_clone,
semaphore_clone,
)
.await
{
panic!(
"Tweet events processing thread {} exited unexpectedly: {:#}. This is a critical failure - the feeder cannot function without tweet event processing.",
thread_id, e
);
}
}
Err(e) => {
panic!(
"Failed to create consumer for thread {}: {:#}",
thread_id, e
);
}
}
});
}
}
Identical structure to v1's spawn_processing_threads. The differences:
- Captures
post_store_clone,tx_clone,semaphore_cloneinstead of producer. - Calls
process_tweet_events_v2(instead ofprocess_tweet_events). - Re-uses v1's
start_partition_lag_monitor(the lag-monitor logic is the same — read partition lags, emit gauges).
Good code-reuse: the lag monitor was defined in v1, made pub, and v2 calls it. No duplication.
/// Process a single batch of messages: deserialize, extract posts, and store them
fn deserialize_batch(
messages: Vec<KafkaMessage>,
) -> Result<(Vec<LightPost>, Vec<TweetDeleteEvent>)> {
let start_time = Instant::now();
let num_messages = messages.len();
let results = deserialize_kafka_messages(messages, deserialize_tweet_event_v2)?;
let deser_elapsed = start_time.elapsed();
if DESER_LOG_COUNTER
.fetch_add(1, Ordering::Relaxed)
.is_multiple_of(1000)
{
info!(
"Deserialized {} messages in {:?} ({:.2} msgs/sec)",
num_messages,
deser_elapsed,
num_messages as f64 / deser_elapsed.as_secs_f64()
);
}
The per-batch worker. Note: fn, not async fn. Pure CPU work. This is going to be called from spawn_blocking later.
Deserialize (same helper as before, but with deserialize_tweet_event_v2 — proto rather than Thrift). Time it. Every 1,000th batch, log throughput.
The as_secs_f64() on a Duration is the floating-point seconds form. num_messages as f64 / elapsed.as_secs_f64() gives msgs/sec.
let mut create_tweets = Vec::with_capacity(results.len());
let mut delete_tweets = Vec::with_capacity(10);
for tweet_event in results {
match tweet_event.event_variant.unwrap() {
in_network_event::EventVariant::TweetCreateEvent(create_event) => {
create_tweets.push(LightPost {
post_id: create_event.post_id,
author_id: create_event.author_id,
created_at: create_event.created_at,
in_reply_to_post_id: create_event.in_reply_to_post_id,
in_reply_to_user_id: create_event.in_reply_to_user_id,
is_retweet: create_event.is_retweet,
is_reply: create_event.is_reply
|| create_event.in_reply_to_post_id.is_some()
|| create_event.in_reply_to_user_id.is_some(),
source_post_id: create_event.source_post_id,
source_user_id: create_event.source_user_id,
has_video: create_event.has_video,
conversation_id: create_event.conversation_id,
});
}
in_network_event::EventVariant::TweetDeleteEvent(delete_event) => {
delete_tweets.push(delete_event);
}
}
}
Ok((create_tweets, delete_tweets))
}
Project the v2 TweetCreateEvent into a LightPost. Almost a 1:1 copy — but watch the is_reply field:
is_reply: create_event.is_reply
|| create_event.in_reply_to_post_id.is_some()
|| create_event.in_reply_to_user_id.is_some(),
Defensive: if any of is_reply, in_reply_to_post_id, or in_reply_to_user_id indicate this is a reply, mark it as a reply. This guards against upstream inconsistency — maybe an old v1 transformer set in_reply_to_post_id but forgot is_reply.
delete_tweets: Vec<TweetDeleteEvent> — we keep the full delete event (proto struct) here, not just IDs, because PostStore::mark_as_deleted takes the full event. The capacity hint Vec::with_capacity(10) reflects the empirical reality that deletes are rare relative to creates.
The match is exhaustive over EventVariant (only two variants), so no _ => … arm. If a third variant is added, the compiler forces us to handle it.
/// Main message processing loop that polls Kafka, batches messages, and stores posts
async fn process_tweet_events_v2(
consumer: Arc<RwLock<KafkaConsumer>>,
post_store: Arc<PostStore>,
batch_size: usize,
tx: tokio::sync::mpsc::Sender<i64>,
semaphore: Arc<Semaphore>,
) -> Result<()> {
let mut message_buffer = Vec::new();
let mut batch_count = 0_usize;
let mut init_data_downloaded = false;
The polling loop, v2. New state: init_data_downloaded — has this thread caught up yet?
loop {
let poll_result = {
let mut consumer_lock = consumer.write().await;
consumer_lock.poll(batch_size).await
};
match poll_result {
Ok(messages) => {
let catchup_sender = if !init_data_downloaded {
let consumer_lock = consumer.read().await;
if let Ok(lags) = consumer_lock.get_partition_lags().await {
let total_lag: i64 = lags.iter().map(|l| l.lag).sum();
if total_lag < (lags.len() * batch_size) as i64 {
init_data_downloaded = true;
Some((tx.clone(), total_lag))
} else {
None
}
} else {
None
}
} else {
None
};
After polling but before processing, check if we've caught up.
The condition total_lag < (lags.len() * batch_size) as i64 means "the sum of lags across our partitions is less than our partitions times the batch size." In other words: less than one batch per partition behind. That's "close enough to live" — keep ingesting, but ready to throttle and signal main.rs.
If we're close enough:
- Flip
init_data_downloaded = true(won't check again). - Build
Some((sender, lag))— used a few lines down to actually send.
This sets up the signal but doesn't fire it yet. The signal fires only after the upcoming batch finishes (so initial catchup includes processing this last batch).
message_buffer.extend(messages);
// Process batch when we have enough messages
if message_buffer.len() >= batch_size {
batch_count += 1;
let messages = std::mem::take(&mut message_buffer);
let post_store_clone = Arc::clone(&post_store);
// Acquire semaphore permit if init data is downloaded to allow enough CPU for serving requests
let permit = if init_data_downloaded {
Some(semaphore.clone().acquire_owned().await.unwrap())
} else {
None
};
Buffer accumulation. Then, if we have enough:
batch_count += 1- Take the buffer (atomic swap with empty)
- If caught up, acquire a semaphore permit. Otherwise (during catchup), skip the semaphore — we want max throughput.
semaphore.clone().acquire_owned() returns an OwnedSemaphorePermit — a permit that owns its share, so it can be moved into a task. (The non-owned acquire() returns a borrow tied to the semaphore's lifetime — wouldn't move into a spawn.)
The .unwrap() on acquire_owned() only fails if the semaphore was closed, which doesn't happen here.
// Send batch to blocking thread pool for processing
let _ = tokio::task::spawn_blocking(move || {
let _permit = permit; // Hold permit until task completes
match deserialize_batch(messages) {
Err(e) => warn!("Error processing batch {}: {:#}", batch_count, e),
Ok((light_posts, delete_posts)) => {
post_store_clone.insert_posts(light_posts);
post_store_clone.mark_as_deleted(delete_posts);
}
};
})
.await;
The actual batch processing. Sent to spawn_blocking — runs on Tokio's blocking thread pool (default 512 threads). Why? Because deserialization + the per-post insertion into the (lock-protected) DashMaps is CPU-bound work. Putting it on the async pool would block whatever else was scheduled to that worker thread.
let _permit = permit; — moves the permit into the closure. The permit's Drop impl releases the semaphore slot. Without this line, the permit would be dropped immediately on entering the spawn (the closure capture would consume it but the rebind to _ would discard before use). The pattern is a common Rust idiom: "hold this guard until the end of scope."
Inside the blocking closure:
- Deserialize. If it fails: log a warning (no panic, no error propagation — corrupt batch = log and skip).
- If it succeeds: insert into the store. Note
insert_postsandmark_as_deletedare both synchronous methods onPostStore(we saw them in Session 02). DashMap operations are sync.
The let _ = ... .await; discards both the spawn handle and any error. If the spawned task panicked, we'd see it in logs but not act on it. Probably fine for ingest — a single batch loss is recoverable.
if let Some((sender, lag)) = catchup_sender {
info!("Completed kafka init for a single thread");
if let Err(e) = sender.send(lag).await {
log::error!("error sending {}", e);
}
}
}
}
Err(e) => {
warn!("Error polling messages: {:#}", e);
metrics::KAFKA_POLL_ERRORS.inc();
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
After processing (so the batch is in the store before we signal), send the catchup notification. If we set up the catchup_sender earlier in this iteration, send now. tx.send(lag).await — send the lag value through the mpsc channel. Sender error means receiver dropped — log and continue.
Then back to the top of the loop.
Error path same as v1: log + counter + 100ms sleep.
No offset commit in v2. v2's consumer is configured with the default (auto-commit, see kafka_utils.rs from Session 02 — v1 sets enable_auto_commit: false, v2 uses default which is true). Why the difference?
- v1 is a transformer with side-effects (producing to Kafka). At-least-once requires manual commit after producing.
- v2 is a sink into a recoverable in-memory store. On crash, we just re-read from the earliest offset — the in-memory store rebuilds. No need to track precise commit points.
End of v2.
thunder_service.rs (339 lines) — the gRPC service
The user-facing API. home-mixer calls GetInNetworkPosts(user_id, following_user_ids, exclude_tweet_ids, max_results, is_video_request, debug) and gets back a vec of LightPost.
use lazy_static::lazy_static;
use log::{debug, info, warn};
use std::cmp::Reverse;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::Semaphore;
use tonic::{Request, Response, Status};
use xai_thunder_proto::{
GetInNetworkPostsRequest, GetInNetworkPostsResponse, LightPost,
in_network_posts_service_server::{InNetworkPostsService, InNetworkPostsServiceServer},
};
Imports:
lazy_static— looks unused below (nolazy_static!macro call). Probably leftover.std::cmp::Reverse— wrapper that inverts theOrdimpl. Used for descending sort below.tokio::sync::Semaphore— backpressure on concurrent requests.tonic::{Request, Response, Status}— gRPC request/response wrappers and the error type (gRPCStatus).- Generated proto types: the request/response and the trait
InNetworkPostsService(which we implement) + the server wrapperInNetworkPostsServiceServer.
use crate::config::{
MAX_INPUT_LIST_SIZE, MAX_POSTS_TO_RETURN, MAX_VIDEOS_TO_RETURN,
};
use crate::metrics::{
GET_IN_NETWORK_POSTS_COUNT, GET_IN_NETWORK_POSTS_DURATION,
GET_IN_NETWORK_POSTS_DURATION_WITHOUT_STRATO, GET_IN_NETWORK_POSTS_EXCLUDED_SIZE,
GET_IN_NETWORK_POSTS_FOLLOWING_SIZE, GET_IN_NETWORK_POSTS_FOUND_FRESHNESS_SECONDS,
GET_IN_NETWORK_POSTS_FOUND_POSTS_PER_AUTHOR, GET_IN_NETWORK_POSTS_FOUND_REPLY_RATIO,
GET_IN_NETWORK_POSTS_FOUND_TIME_RANGE_SECONDS, GET_IN_NETWORK_POSTS_FOUND_UNIQUE_AUTHORS,
GET_IN_NETWORK_POSTS_MAX_RESULTS, IN_FLIGHT_REQUESTS, REJECTED_REQUESTS, Timer,
};
use crate::posts::post_store::PostStore;
use crate::strato_client::StratoClient;
The metrics list is long. Notice the structure:
_FOUND_*— labeled by stage ("retrieved"vs"scored")._DURATION/_DURATION_WITHOUT_STRATO— two latency histograms. The "without strato" one excludes time spent fetching the follow list externally.IN_FLIGHT_REQUESTS/REJECTED_REQUESTS— backpressure signals.Timer— the RAII-style stopwatch frommetrics.
pub struct ThunderServiceImpl {
/// PostStore for retrieving posts by user ID
post_store: Arc<PostStore>,
/// StratoClient for fetching following lists when not provided
strato_client: Arc<StratoClient>,
/// Semaphore to limit concurrent requests and prevent overload
request_semaphore: Arc<Semaphore>,
}
The service struct. Three fields:
- The post store (read-only access — we never mutate).
- The Strato client for fetching follow lists when the caller didn't pre-fetch them.
- A request semaphore for concurrent-request backpressure.
impl ThunderServiceImpl {
pub fn new(
post_store: Arc<PostStore>,
strato_client: Arc<StratoClient>,
max_concurrent_requests: usize,
) -> Self {
info!(
"Initializing ThunderService with max_concurrent_requests={}",
max_concurrent_requests
);
Self {
post_store,
strato_client,
request_semaphore: Arc::new(Semaphore::new(max_concurrent_requests)),
}
}
Constructor. Wraps max_concurrent_requests permits in a new semaphore.
/// Create a gRPC server for this service
pub fn server(self) -> InNetworkPostsServiceServer<Self> {
InNetworkPostsServiceServer::new(self)
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.send_compressed(tonic::codec::CompressionEncoding::Zstd)
}
server() consumes self and wraps it in the tonic-generated server struct. The fluent calls turn on Zstandard compression in both directions. Smart for this workload — LightPost is tiny (~80 bytes), and a response can hold hundreds, so compression saves real bandwidth at modest CPU cost.
The self is consumed (not &self) because the gRPC server takes ownership.
/// Analyze found posts, calculate statistics, and report metrics
/// The `stage` parameter is used as a label to differentiate between stages (e.g., "post_store", "scored")
fn analyze_and_report_post_statistics(posts: &[LightPost], stage: &str) {
if posts.is_empty() {
debug!("[{}] No posts found for analysis", stage);
return;
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
// Time since most recent post
let time_since_most_recent = posts
.iter()
.map(|post| post.created_at)
.max()
.map(|most_recent| now - most_recent);
// Time since oldest post
let time_since_oldest = posts
.iter()
.map(|post| post.created_at)
.min()
.map(|oldest| now - oldest);
A static method (no &self) that computes per-batch statistics and emits them to Prometheus, labelled by stage.
Compute "time since most recent" (newest post in batch) and "time since oldest" (oldest post). Both as Option<i64> because .max()/.min() return None on empty (but we already handled empty up top).
// Count replies vs original posts
let reply_count = posts.iter().filter(|post| post.is_reply).count();
let original_count = posts.len() - reply_count;
// Unique authors
let unique_authors: HashSet<_> = posts.iter().map(|post| post.author_id).collect();
let unique_author_count = unique_authors.len();
Reply vs original counts; unique-author count.
// Report metrics with stage label
if let Some(freshness) = time_since_most_recent {
GET_IN_NETWORK_POSTS_FOUND_FRESHNESS_SECONDS
.with_label_values(&[stage])
.observe(freshness as f64);
}
if let (Some(oldest), Some(newest)) = (time_since_oldest, time_since_most_recent) {
let time_range = oldest - newest;
GET_IN_NETWORK_POSTS_FOUND_TIME_RANGE_SECONDS
.with_label_values(&[stage])
.observe(time_range as f64);
}
let reply_ratio = reply_count as f64 / posts.len() as f64;
GET_IN_NETWORK_POSTS_FOUND_REPLY_RATIO
.with_label_values(&[stage])
.observe(reply_ratio);
GET_IN_NETWORK_POSTS_FOUND_UNIQUE_AUTHORS
.with_label_values(&[stage])
.observe(unique_author_count as f64);
if unique_author_count > 0 {
let posts_per_author = posts.len() as f64 / unique_author_count as f64;
GET_IN_NETWORK_POSTS_FOUND_POSTS_PER_AUTHOR
.with_label_values(&[stage])
.observe(posts_per_author);
}
Emit five gauges/histograms, all labelled by stage. with_label_values(&[stage]) is the Prometheus client's mechanism for variable-label metrics.
time_range = oldest - newest because oldest is further in the past (larger "seconds since"), so oldest - newest gives the span. If batch covers 5 minutes of posts, time_range = 300.
// Log statistics with stage label
debug!(
"[{}] Post statistics: total={}, original={}, replies={}, unique_authors={}, posts_per_author={:.2}, reply_ratio={:.2}, time_since_most_recent={:?}s, time_range={:?}s",
stage,
posts.len(),
original_count,
reply_count,
unique_author_count,
if unique_author_count > 0 {
posts.len() as f64 / unique_author_count as f64
} else {
0.0
},
reply_ratio,
time_since_most_recent,
if let (Some(o), Some(n)) = (time_since_oldest, time_since_most_recent) {
Some(o - n)
} else {
None
}
);
}
}
Debug log. Note debug! not info! — this only fires when log level is debug or below. Useful for digging in, off in production.
End of inherent impl. Now the trait impl:
#[tonic::async_trait]
impl InNetworkPostsService for ThunderServiceImpl {
/// Get posts from users in the network
async fn get_in_network_posts(
&self,
request: Request<GetInNetworkPostsRequest>,
) -> Result<Response<GetInNetworkPostsResponse>, Status> {
// Try to acquire semaphore permit without blocking
// If we're at capacity, reject immediately with RESOURCE_EXHAUSTED
let _permit = match self.request_semaphore.try_acquire() {
Ok(permit) => {
IN_FLIGHT_REQUESTS.inc();
permit
}
Err(_) => {
REJECTED_REQUESTS.inc();
return Err(Status::resource_exhausted(
"Server at capacity, please retry",
));
}
};
The one method on the trait. Step 1: try to acquire a semaphore permit, non-blocking.
try_acquire() returns Ok(permit) immediately if a permit is available, or Err(_) if not. It does not wait. This is the key to bounded latency: if we're at capacity, return RESOURCE_EXHAUSTED immediately rather than queueing. The client should retry (or fall back to a degraded experience).
In contrast, .acquire().await would wait — and a queue of waiting requests is a queue of requests racking up tail latency. Better to reject and let the load balancer retry on a less-loaded instance.
IN_FLIGHT_REQUESTS.inc() / REJECTED_REQUESTS.inc() — observe the decision.
// Use a guard to decrement in_flight_requests when the request completes
struct InFlightGuard;
impl Drop for InFlightGuard {
fn drop(&mut self) {
IN_FLIGHT_REQUESTS.dec();
}
}
let _in_flight_guard = InFlightGuard;
An inline RAII guard. Local struct + Drop impl. When the function returns (success or error), _in_flight_guard drops, calling IN_FLIGHT_REQUESTS.dec(). So the gauge is always accurate regardless of which return path we take.
Alternative would be IN_FLIGHT_REQUESTS.dec() at each return — error-prone, easy to forget on a new return path. The guard is more robust.
// Start timer for total latency
let _total_timer = Timer::new(GET_IN_NETWORK_POSTS_DURATION.clone());
let req = request.into_inner();
if req.debug {
info!(
"Received GetInNetworkPosts request: user_id={}, following_count={}, exclude_tweet_ids={}",
req.user_id,
req.following_user_ids.len(),
req.exclude_tweet_ids.len(),
);
}
Start a total-latency timer (RAII — observes elapsed on drop). Extract the request payload.
req.debug — client-set flag for verbose logging. Only fires the info log if the client opted in. Common pattern for letting the requester (some specific service) request more logging without flooding global logs.
// If following_user_id list is empty, fetch it from Strato
let following_user_ids = if req.following_user_ids.is_empty() && req.debug {
info!(
"Following list is empty, fetching from Strato for user {}",
req.user_id
);
match self
.strato_client
.fetch_following_list(req.user_id as i64, MAX_INPUT_LIST_SIZE as i32)
.await
{
Ok(following_list) => {
info!(
"Fetched {} following users from Strato for user {}",
following_list.len(),
req.user_id
);
following_list.into_iter().map(|id| id as u64).collect()
}
Err(e) => {
warn!(
"Failed to fetch following list from Strato for user {}: {}",
req.user_id, e
);
return Err(Status::internal(format!(
"Failed to fetch following list: {}",
e
)));
}
}
} else {
req.following_user_ids
};
If the request didn't include a follow list, fetch from Strato. Wait — but the condition is req.following_user_ids.is_empty() && req.debug. The Strato path only triggers when both is_empty AND debug is set! Likely an unintended bug: the && req.debug should probably be removed. As-written, a non-debug request with empty follow list will just use the empty list (and return zero posts). This is almost certainly a quirk of the open-source release — maybe in production every real caller sends the follow list.
Or: the intent is "we want Strato fetches to be opt-in for debugging, real callers must provide the list." That's a defensible design — keeping Strato off the hot path. But the API surface is misleading then.
The into_iter().map(|id| id as u64).collect() converts Vec<i64> (Strato side) to Vec<u64> (proto side). The proto says repeated uint64.
Status::internal(format!(...)) returns a INTERNAL gRPC status with a human-readable message.
// Record metrics for request parameters
GET_IN_NETWORK_POSTS_FOLLOWING_SIZE.observe(following_user_ids.len() as f64);
GET_IN_NETWORK_POSTS_EXCLUDED_SIZE.observe(req.exclude_tweet_ids.len() as f64);
// Start timer for latency without strato call
let _processing_timer = Timer::new(GET_IN_NETWORK_POSTS_DURATION_WITHOUT_STRATO.clone());
Observe two request-shape histograms. Then start the processing timer — separate from the total timer. The processing timer doesn't include Strato (which already finished by this point).
So we get:
- Total latency = Strato fetch (if any) + processing.
- Processing latency = just the in-memory lookup + filter + score.
Two histograms = two p99 numbers in dashboards. Helpful for distinguishing "we're slow because Strato is slow" vs "we're slow because the store is overloaded."
// Default max_results if not specified
let max_results = if req.max_results > 0 {
req.max_results as usize
} else if req.is_video_request {
MAX_VIDEOS_TO_RETURN
} else {
MAX_POSTS_TO_RETURN
};
GET_IN_NETWORK_POSTS_MAX_RESULTS.observe(max_results as f64);
Resolve max_results:
- If caller specified > 0, honour it.
- Otherwise, use the appropriate default based on whether this is a video request.
Histogram-observe the resolved value so we can audit caller behaviour.
// Limit following_user_ids and exclude_tweet_ids to first K entries
let following_count = following_user_ids.len();
if following_count > MAX_INPUT_LIST_SIZE {
warn!(
"Limiting following_user_ids from {} to {} entries for user {}",
following_count, MAX_INPUT_LIST_SIZE, req.user_id
);
}
let following_user_ids: Vec<u64> = following_user_ids
.into_iter()
.take(MAX_INPUT_LIST_SIZE)
.collect();
let exclude_count = req.exclude_tweet_ids.len();
if exclude_count > MAX_INPUT_LIST_SIZE {
warn!(
"Limiting exclude_tweet_ids from {} to {} entries for user {}",
exclude_count, MAX_INPUT_LIST_SIZE, req.user_id
);
}
let exclude_tweet_ids: Vec<u64> = req
.exclude_tweet_ids
.into_iter()
.take(MAX_INPUT_LIST_SIZE)
.collect();
Hard upper bounds on input list sizes. If a user follows 100k accounts, we don't scan all 100k in the request path — we scan the first MAX_INPUT_LIST_SIZE (probably 10k or similar). Log when truncation happens.
This is a critical safeguard: an adversarial client (or buggy one) sending huge lists could otherwise drag the whole service to a halt. Bound the work per request.
.into_iter().take(N).collect() is the idiomatic Rust truncate.
// Clone Arc references needed inside spawn_blocking
let post_store = Arc::clone(&self.post_store);
let request_user_id = req.user_id as i64;
// Use spawn_blocking to avoid blocking tokio's async runtime
let proto_posts = tokio::task::spawn_blocking(move || {
// Create exclude tweet IDs set for efficient filtering of previously seen posts
let exclude_tweet_ids: HashSet<i64> =
exclude_tweet_ids.iter().map(|&id| id as i64).collect();
let start_time = Instant::now();
// Fetch all posts (original + secondary) for the followed users
let all_posts: Vec<LightPost> = if req.is_video_request {
post_store.get_videos_by_users(
&following_user_ids,
&exclude_tweet_ids,
start_time,
request_user_id,
)
} else {
post_store.get_all_posts_by_users(
&following_user_ids,
&exclude_tweet_ids,
start_time,
request_user_id,
)
};
The heavy lifting on spawn_blocking. Why? PostStore::get_*_by_users is synchronous (DashMap ops, no async I/O). For a request that has to scan thousands of authors' deques, this can take significant CPU. Running on the blocking thread pool keeps the async pool responsive.
Pre-conversion: Vec<u64> (proto) → HashSet<i64> (store API). The HashSet enables O(1) contains checks during filtering.
Then call either get_videos_by_users or get_all_posts_by_users (we walked these in Session 02). Pass start_time to enforce the request timeout.
// Analyze posts and report statistics after querying post_store
ThunderServiceImpl::analyze_and_report_post_statistics(&all_posts, "retrieved");
let scored_posts = score_recent(all_posts, max_results);
// Analyze posts and report statistics after scoring
ThunderServiceImpl::analyze_and_report_post_statistics(&scored_posts, "scored");
scored_posts
})
.await
.map_err(|e| Status::internal(format!("Failed to process posts: {}", e)))?;
Two stats reports: before scoring ("retrieved") and after ("scored"). They let us monitor freshness/reply-ratio/etc. both before and after the top-K cut. If retrieved is fresh but scored is older, it means the scoring is biased toward old content (or vice versa).
The "scoring" is score_recent (defined at the bottom of the file) — just a descending-recency sort, then take(max_results). So Thunder's score = recency. The whole heavy ML scoring happens downstream in home-mixer via the Phoenix model. Thunder is a fast in-network candidate source; downstream code handles ranking.
spawn_blocking(...).await returns Result<T, JoinError>. We convert a join error (panic / cancellation) to a gRPC Status::internal.
if req.debug {
info!(
"Returning {} posts for user {}",
proto_posts.len(),
req.user_id
);
}
// Record the number of posts returned
GET_IN_NETWORK_POSTS_COUNT.observe(proto_posts.len() as f64);
let response = GetInNetworkPostsResponse { posts: proto_posts };
Ok(Response::new(response))
}
}
Debug log + final histogram observation + wrap in proto response. Return.
/// Score posts by recency (created_at timestamp, newer posts first)
fn score_recent(mut light_posts: Vec<LightPost>, max_results: usize) -> Vec<LightPost> {
light_posts.sort_unstable_by_key(|post| Reverse(post.created_at));
// Limit to max results
light_posts.into_iter().take(max_results).collect()
}
The "scorer." Sort descending by created_at. Reverse(post.created_at) flips the Ord so the sort is descending without us writing a custom comparator.
sort_unstable_by_key is faster than sort_by_key (no stability guarantee). For our purposes that's fine — if two posts have the same timestamp, either order is acceptable.
End of file.
What we've learned (Thunder, end to end)
After two sessions, Thunder makes sense as a coherent service:
A specialized in-memory index. Posts come in via Kafka, sit in DashMaps keyed by author for ~2 days, and a gRPC service returns them filtered by follow list.
Two roles, one binary. A --is-serving flag flips between:
- Transformer: read legacy Thrift
TweetEvents, project into canonicalLightPost, emit as v2 protos. - Serving: read v2 protos, write to in-memory store, serve gRPC.
Multi-threaded Kafka consumption. Partitions are bucketed across N threads. Each thread owns a subset and a dedicated consumer. Backpressure comes from manual offset commits (v1) or from the throttling semaphore that activates after catchup (v2).
Catchup signal. Each v2 thread tracks its initial lag and signals main.rs when it's "close enough" to the head (one batch's worth). Only then does the binary flip readiness on the health endpoint.
Blocking-pool sync work. Every DashMap access from gRPC handlers goes through spawn_blocking. The async pool stays responsive; the blocking pool absorbs the lookup latency.
Three bounded queues keep tail latency in check:
- Request semaphore (
max_concurrent_requests) → reject withRESOURCE_EXHAUSTEDinstead of queueing. - Per-author scan cap (
MAX_TINY_POSTS_PER_USER_SCAN) → don't scan a megaposter's entire history for every request. - Input list cap (
MAX_INPUT_LIST_SIZE) → reject huge follow lists silently (with a warning log).
Recency-only scoring. Thunder doesn't predict engagement. That's phoenix's job, downstream. Thunder's "ranking" is sort_by(created_at desc).take(K). The intuition: Thunder is the retrieval layer; ranking happens later.
Recovery model. v1 manual-commits offsets only after successful produce (at-least-once). v2 auto-commits because the in-memory store is rebuildable from earliest offsets on every cold start. The on-disk state is the Kafka offset log; everything else is derivable.
Two open-source artifacts we noted:
tweet_events_listener.rsline ~16:crate::config::MIN_VIDEO_DURATION_MSis doubly-prefixed (use crate::{ ..., crate::config::MIN_VIDEO_DURATION_MS, ... }) — won't compile verbatim.thunder_service.rsStrato fallback gated onreq.debugin addition to empty list — likely an unintended&& req.debugthat should be removed in production code.
Next session
Session 04 — Home-Mixer core + models. We move out of thunder/ into the main feed orchestration service. We'll cover:
home-mixer/lib.rs,main.rs,server.rs,for_you_server.rs,scored_posts_server.rs(the 803-LOC service shell)home-mixer/models/*— theQueryandCandidatestructs and their feature/result subtypes (~701 LOC)
About 1,504 LOC, the largest single session of the series. After that, we'll be ready to dive into the actual pipeline configuration (filters, hydrators, scorers, sources, side-effects) in Sessions 05–14.