X For You algorithm, line by line · Part 12

X For You algorithm, line by line — Part 12: Side effects (part 1)

Part 12 of the deep dive into xai-org/x-algorithm. First half of the side-effect stage: MutualFollow stats, served-history truncation, past-request-timestamps write, Kafka impressions publish, Redis post-candidate cache with zstd, cross-DC Phoenix request cache, ads-injection logging, response-stats counters, 5%-sampled reranking Kafka publish.

May 15, 2026·19 min read

Side effects are the fire-and-forget tasks that run after the response goes back to the client. They write to Kafka for training data, update Manhattan/Redis state, emit metrics, log to debug streams. From Session 01 we know they're spawned into Tokio tasks via tokio::spawn — their failures don't affect the response.

The Phoenix and For You pipelines combine for 14 active side-effects. Sessions 12-13 split them. This session covers 10 (~1,000 LOC) — the smaller / infrastructure-focused ones. Session 13 has the 5 heavier ones (analytics + state-write).

Files covered:

home-mixer/side_effects/
├── mod.rs                                            (14)
├── mutual_follow_stats_side_effect.rs                (67)   stats only
├── truncate_served_history_side_effect.rs            (53)   Manhattan maintenance
├── update_past_request_timestamps_side_effect.rs     (61)   Manhattan write
├── publish_seen_ids_to_kafka_side_effect.rs          (79)   Kafka publish
├── redis_post_candidate_cache_side_effect.rs         (92)   Redis cache write
├── phoenix_request_cache_side_effect.rs              (131)  cross-DC Redis write
├── ads_injection_logging_side_effect.rs              (146)  Kafka ad logging
├── for_you_response_stats_side_effect.rs             (157)  response metrics
├── reranking_kafka_side_effect.rs                    (157)  Kafka reranking samples
└── cache_request_info_side_effect.rs                 (42)   [orphan]

The 14 in mod.rs are wired into the pipelines. cache_request_info_side_effect.rs is an orphan file (not in mod.rs) — pre-refactor leftover.


mod.rs (14 lines)

pub mod ads_injection_logging_side_effect;
pub mod client_events_kafka_side_effect;
pub mod for_you_response_stats_side_effect;
pub mod mutual_follow_stats_side_effect;
pub mod phoenix_experiments_side_effect;
pub mod phoenix_request_cache_side_effect;
pub mod publish_seen_ids_to_kafka_side_effect;
pub mod redis_post_candidate_cache_side_effect;
pub mod reranking_kafka_side_effect;
pub mod scored_stats_side_effect;
pub mod served_candidates_kafka_side_effect;
pub mod truncate_served_history_side_effect;
pub mod update_past_request_timestamps_side_effect;
pub mod update_served_history_side_effect;

14 modules. cache_request_info_side_effect (in the directory) is not declared — orphan.


mutual_follow_stats_side_effect.rs (67 lines) — pure metrics

The simplest active side effect. Computes the average mutual-follow Jaccard across selected candidates and emits a histogram.

const METRIC_PREFIX: &str = "MutualFollowJaccard";

pub struct MutualFollowStatsSideEffect;

#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for MutualFollowStatsSideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        query.params.get(EnableMutualFollowJaccardHydration)
    }

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        let Some(receiver) = global_stats_receiver() else {
            return Ok(());
        };

        let candidates = &input.selected_candidates;
        if candidates.is_empty() {
            return Ok(());
        }

Three guards: enabled flag, receiver exists, at least one candidate.

        let retrieval_cluster: String = input.query.params.get(PhoenixRetrievalInferenceClusterId);
        let moe_enabled: bool = input.query.params.get(EnablePhoenixMOESource);

        let mut scope: Vec<(&str, &str)> = vec![("retrieval_cluster", &retrieval_cluster)];
        let moe_cluster: String;
        if moe_enabled {
            moe_cluster = input
                .query
                .params
                .get(PhoenixRetrievalMOEInferenceClusterId);
            scope.push(("moe_cluster", &moe_cluster));
        }

Build the metric scope (tags). Always include retrieval_cluster; if MoE is enabled, also tag the MoE cluster. The moe_cluster: String; declared outside the if keeps it alive for the duration of scope — necessary because scope borrows &moe_cluster.

        let avg_key = format!("{METRIC_PREFIX}.avgScore");

        let mut sum = 0.0f64;
        let mut present = 0u64;

        for candidate in candidates {
            if let Some(j) = candidate.mutual_follow_jaccard {
                present += 1;
                sum += j;
            }
        }

        if present > 0 {
            let avg = sum / present as f64;
            receiver.observe(&avg_key, &scope, avg, HistogramBuckets::Bucket0To1);
        }

        Ok(())
    }
}

Compute average of present Jaccard values (skip None). Emit as a histogram in [0, 1] bucket. Lets dashboards correlate average mutual-follow score with experiment arms.


truncate_served_history_side_effect.rs (53 lines) — Manhattan trim

Trims the user's served history to a bounded size. Old entries get deleted.

const MAX_RESPONSES: usize = 50;

pub struct TruncateServedHistorySideEffect {
    client: Arc<dyn ServedHistoryClient>,
}

#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for TruncateServedHistorySideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        is_prod()
            && query.params.get(EnableUrtMigrationComponents)
            && query.served_history.len() > MAX_RESPONSES
    }

Only fires when prod + flag is on + history is over the threshold. Most requests skip this — only the ones that have accumulated history-overflow trigger trimming.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let query = &input.query;

        let to_delete: Vec<i64> = query
            .served_history
            .iter()
            .skip(MAX_RESPONSES)
            .filter_map(|sh| sh.served_time_ms)
            .collect();

        if to_delete.is_empty() {
            return Ok(());
        }

        let platform = client_platform::from_client_app_id(query.client_app_id);
        self.client
            .delete(query.user_id, TimelineType::Home, platform, &to_delete)
            .await
            .map_err(|e| e.to_string())
    }
}

Take the oldest entries past index 50, extract their served-time-ms keys, send a delete call. Trim happens lazily and only when over the limit. Saves Manhattan storage for users who request frequently.


update_past_request_timestamps_side_effect.rs (61 lines) — write request timestamp

Records "this user made a non-polling request at time X" — so the next request's PastRequestTimestampsQueryHydrator can read it.

const MAX_NON_POLLING_TIMES: usize = 10;

pub struct UpdatePastRequestTimestampsSideEffect {
    client: Arc<dyn PastRequestTimestampsClient>,
}

#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for UpdatePastRequestTimestampsSideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        let is_background_fetch =
            RequestContext::parse(&query.request_context) == RequestContext::BackgroundFetch;
        is_prod()
            && query.params.get(EnableUrtMigrationComponents)
            && !query.is_polling
            && !is_background_fetch
    }

Skip if polling or background-fetch. We only record explicit user-initiated requests (foreground refresh).

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let query = &input.query;

        let now_ms = query.request_time_ms;

        let prior_timestamps = query
            .non_polling_timestamps
            .as_ref()
            .map(|npt| &npt.non_polling_timestamps_ms[..])
            .unwrap_or(&[]);
        let most_recent = query
            .non_polling_timestamps
            .as_ref()
            .and_then(|npt| npt.most_recent_home_latest_non_polling_timestamp_ms);

        let timestamps: Vec<i64> = std::iter::once(now_ms)
            .chain(prior_timestamps.iter().copied())
            .take(MAX_NON_POLLING_TIMES)
            .collect();

        let npt = NonPollingTimestamps::new(timestamps, most_recent);
        self.client.put(query.user_id, &npt).await
    }
}

Build the new list: [now] + prior[..], capped at 10. Preserve most_recent_home_latest_non_polling_timestamp_ms from the prior state. Write to Manhattan.

Sliding window of the last 10 user-initiated request times. Used downstream for "you've been gone for X" UX (e.g., "Catch up on what you missed" markers in URT).


publish_seen_ids_to_kafka_side_effect.rs (79 lines) — Kafka impressions

Publishes the user's seen-IDs to Kafka. Downstream consumers update impression bloom filters, training data, etc.

pub struct PublishSeenIdsToKafkaSideEffect {
    kafka_client: Arc<dyn KafkaPublisherClient>,
}

impl PublishSeenIdsToKafkaSideEffect {
    pub fn new(kafka_client: Arc<dyn KafkaPublisherClient>) -> Self {
        Self { kafka_client }
    }

    pub async fn prod() -> Self {
        Self::new(Arc::new(
            ProdKafkaPublisherClient::new(CLIENT_SENT_IMPRESSIONS_TOPIC, KafkaCluster::Bluebird)
                .await,
        ))
    }
}

Production constructor builds a Kafka client for CLIENT_SENT_IMPRESSIONS_TOPIC on the Bluebird cluster (X's internal Kafka cluster names).

    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        is_prod() && !query.seen_ids.is_empty() && query.params.get(EnablePublishSeenIdsToKafka)
    }

Prod-only + non-empty seen_ids + flag.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let query = &input.query;
        if query.seen_ids.is_empty() {
            return Ok(());
        }

        let current_time = query.request_time_ms;

        let surface_areas = Some(BTreeSet::from([SurfaceArea::HOME_TIMELINE]));

        let impressions: Vec<Impression> = query
            .seen_ids
            .iter()
            .map(|&tweet_id| Impression {
                tweet_id: tweet_id as i64,
                impression_time: Some(current_time),
                linger_time_ms: None,
                surface_areas: surface_areas.clone(),
                media_details: None,
            })
            .collect();

Build one Impression per seen ID with the current time. Set the surface to HOME_TIMELINE. linger_time_ms is None (we don't know how long they looked; this is just "client said it sent these IDs as already-seen").

        let published = PublishedImpressionList::new(
            query.user_id as i64,
            ImpressionList::new(impressions),
            current_time,
        );

        let bytes = serialize_binary(&published)
            .map_err(|e| format!("Thrift serialization failed: {e}"))?;

        self.kafka_client
            .send(&bytes)
            .await
            .map_err(|e| format!("Seen-IDs Kafka publish failed: {e}"))
    }
}

Wrap in PublishedImpressionList, serialize to Thrift binary, send to Kafka.

This is the data feed for the impression bloom filter store that ImpressionBloomFilterQueryHydrator (Session 09) reads back next time.


redis_post_candidate_cache_side_effect.rs (92 lines) — fill the read cache

Writes the top scored candidates to Redis so the next request's CachedPostsQueryHydrator (Session 09) can hit the cache and skip the entire scoring pipeline.

const REDIS_TTL_SECONDS: u64 = 180;
const ZSTD_COMPRESSION_LEVEL: i32 = 6;

pub struct RedisPostCandidateCacheSideEffect {
    redis_client: Arc<dyn RedisClient>,
}

180-second TTL: cached posts expire after 3 minutes. Short — the model output is stale fast. Long enough to cover a typical user's quick second pull.

ZSTD_COMPRESSION_LEVEL: 6 — moderate compression. 9 would be slower; 3 would be faster but bigger.

    fn get_candidates_to_cache<'a>(
        selected: &'a [PostCandidate],
        non_selected: &'a [PostCandidate],
        max_posts_to_cache: usize,
    ) -> Vec<&'a PostCandidate> {
        let mut all_candidates: Vec<&PostCandidate> = selected
            .iter()
            .chain(non_selected.iter())
            .filter(|c| c.weighted_score.is_some_and(|s| s > 0.0))
            .collect();
        all_candidates.sort_by(|a, b| {
            b.weighted_score
                .unwrap()
                .partial_cmp(&a.weighted_score.unwrap())
                .unwrap_or(std::cmp::Ordering::Equal)
        });
        all_candidates.truncate(max_posts_to_cache);
        all_candidates
    }

Cache more than was served: take both selected and non_selected, sort by weighted_score descending, truncate to max_posts_to_cache (e.g., 500). So the cache has the top N scored candidates — when the next request hits, the cached set can serve a different cut of the top-K.

Filter: only positive-weighted scores. Zero-scored candidates are useless to cache.

b.weighted_score.unwrap() — safe because we already filtered out None values via the filter. Idiomatic if-controversial pattern.

#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for RedisPostCandidateCacheSideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        is_prod() && !query.has_cached_posts
    }

Prod-only + don't double-cache. If we already used cached posts, no need to overwrite.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        let max_posts_to_cache = input.query.params.get(MaxPostsToCache);
        let user_id = input.query.user_id;

        let candidates_to_cache = Self::get_candidates_to_cache(
            &input.selected_candidates,
            &input.non_selected_candidates,
            max_posts_to_cache,
        );

        let cache_key = redis_client::cached_posts_key(
            user_id,
            &input.query.topic_ids,
            input.query.in_network_only,
            input.query.exclude_videos,
        );

Same cache key as the read side (Session 09) — (user, topics, in_network_only, exclude_videos). Read/write key must match.

        let json_payload =
            serde_json::to_vec(&candidates_to_cache).map_err(|err| err.to_string())?;
        let uncompressed_size = json_payload.len();
        let compressed_payload = tokio::task::spawn_blocking(move || {
            zstd::encode_all(json_payload.as_slice(), ZSTD_COMPRESSION_LEVEL)
        })
        .await
        .map_err(|err| err.to_string())?
        .map_err(|err| err.to_string())?;

Two-step serialize: JSON → zstd. The zstd compression runs on spawn_blocking because it's CPU-bound (and at level 6 takes a few ms for 500 candidates). Keeping it off the async runtime.

Double ? again: outer for JoinError, inner for zstd::Error.

        tracing::debug!(
            user_id = user_id,
            count = candidates_to_cache.len(),
            cache_key = cache_key.clone(),
            uncompressed_size = uncompressed_size,
            compressed_size = compressed_payload.len(),
            "RedisPostCandidateCacheSideEffect caching candidates"
        );

        self.redis_client
            .set_ex(cache_key, compressed_payload, REDIS_TTL_SECONDS)
            .await
            .map_err(|err| err.to_string())
    }
}

Debug log with both sizes (useful for compression-ratio diagnostics), then SET EX to Redis.


phoenix_request_cache_side_effect.rs (131 lines) — cross-datacenter cache

Caches the Phoenix model request (input data + per-candidate features) for replay analysis. Writes to two Redis clients — one in Atlanta, one in PDXA — for cross-DC analysis.

const KEY_PREFIX: &str = "phoenix_request_cache";

pub struct PhoenixRequestCacheSideEffect {
    phoenix_request_cache_redis_atla_client: Arc<dyn RedisClient>,
    phoenix_request_cache_redis_pdxa_client: Arc<dyn RedisClient>,
}

Two Redis clients — different DCs (Atlanta, PDXA — "PDX" is Portland; the "A" suffix is likely a specific cluster within that region).

pub fn request_level_cache_key(prediction_request_id: u64) -> String {
    format!("{KEY_PREFIX}_{prediction_request_id}")
}

pub fn candidate_cache_key(prediction_request_id: u64, post_id: u64) -> String {
    format!("{KEY_PREFIX}_{prediction_request_id}_{post_id}")
}

Two key schemes:

  • Request-level: one key per Phoenix request, holding the request envelope (without sequences/candidates).
  • Candidate-level: one key per (request_id, post_id), holding the per-candidate features.

Decomposed so a replay can fetch the envelope once + the candidates it cares about, instead of one giant blob.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        let query = &input.query;
        let ttl = query.params.get(PhoenixRequestCacheSideEffectTtlSeconds);

        if input.selected_candidates.is_empty() && input.non_selected_candidates.is_empty() {
            return Ok(());
        }

        let prediction_request_id = input
            .selected_candidates
            .iter()
            .find_map(|c| c.prediction_request_id);
        let Some(prediction_request_id) = prediction_request_id else {
            return Ok(());
        };

Find the prediction_request_id by scanning selected candidates. Bail if no candidates have one (means Phoenix didn't score).

        let product_surface = if query.in_network_only {
            ProductSurface::HomeTimelineRankedFollowing
        } else {
            ProductSurface::HomeTimelineRanking
        };

        let mut entries: Vec<(String, Vec<u8>)> = Vec::new();

        if !query.has_cached_posts {
            let phoenix_request =
                build_request_without_sequence_and_candidates(query, product_surface);
            entries.push((
                request_level_cache_key(prediction_request_id),
                phoenix_request.encode_to_vec(),
            ));
        }

        for candidate in build_tweet_infos(query, &input.selected_candidates) {
            entries.push((
                candidate_cache_key(prediction_request_id, candidate.tweet_id),
                candidate.encode_to_vec(),
            ));
        }

Build the entries list:

  • Request-level entry (only if not using cached posts — otherwise the request envelope was already cached).
  • One entry per selected candidate (the TweetInfo proto).

Note: build_request_without_sequence_and_candidates and build_tweet_infos are helpers in util::phoenix_request — they construct the data we'd send to Phoenix.

        let mut futures: Vec<
            std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>,
        > = Vec::new();

        for (key, value) in entries {
            let pdxa_key = key.clone();
            let pdxa_value = value.clone();
            futures.push(Box::pin(async move {
                self.phoenix_request_cache_redis_atla_client
                    .set_ex(key, value, ttl)
                    .await
                    .map_err(|e| e.to_string())
            }));
            futures.push(Box::pin(async move {
                self.phoenix_request_cache_redis_pdxa_client
                    .set_ex(pdxa_key, pdxa_value, ttl)
                    .await
                    .map_err(|e| format!("xdc: {e}"))
            }));
        }

        let results = join_all(futures).await;

For each entry, schedule two parallel writes (atla + pdxa). The futures are heap-allocated Box::pin(...) because they have different types (different captured variables) but need to share a Vec. '_ lifetime — they borrow self.

PDXA errors get prefixed with "xdc:" (cross-datacenter) so the failure mode is distinguishable.

join_all resolves when all are done.

        let total = results.len();
        let mut first_error: Option<String> = None;
        let mut error_count = 0usize;
        for result in results {
            if let Err(e) = result {
                error_count += 1;
                if first_error.is_none() {
                    first_error = Some(e);
                }
            }
        }

        if total > 0 && error_count * 10 > total {
            return Err(first_error.unwrap_or_default());
        }
        Ok(())
    }
}

Tolerate up to 10% errors. If more than 10% of writes fail (error_count * 10 > total), surface the first error. Otherwise treat as success.

The threshold reflects the reality of two-DC writes: one DC can be slow/down without failing the whole side-effect. Only when both DCs degrade does the side-effect fail.


ads_injection_logging_side_effect.rs (146 lines) — ad-decision Kafka log

Logs to Kafka which ads were considered and which were chosen. Drives ad-attribution downstream.

pub struct AdsInjectionLoggingSideEffect {
    kafka_client: Arc<dyn KafkaPublisherClient>,
}

impl AdsInjectionLoggingSideEffect {
    pub async fn prod() -> Self {
        Self::new(Arc::new(
            ProdKafkaPublisherClient::new(ADS_INJECTION_TOPIC, KafkaCluster::Ads).await,
        ))
    }
}

Production constructor wires to the Ads Kafka cluster (separate from Bluebird). Ad data has its own cluster for security/isolation.

#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for AdsInjectionLoggingSideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        is_prod() && query.params.get(EnableAdsInjectionLogging)
    }

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let query = &input.query;
        let items = &input.selected_candidates;

        let fetched_posts = count_posts(items) + count_posts(&input.non_selected_candidates);
        let fetched_ads = count_ads(items) + count_ads(&input.non_selected_candidates);

        let request_time_ms = query.request_time_ms;

        let entries: Vec<TimelineEntry> = items
            .iter()
            .enumerate()
            .map(|(pos, item)| build_timeline_entry(item, pos))
            .collect();

        if entries.is_empty() {
            return Ok(());
        }

Count what was fetched (selected + non_selected) and what was selected. Build per-position entries from the selected list.

        let timeline = AdsInjectedTimeline {
            user_id: query.user_id,
            entries,
            request_time_ms,
            display_location: DisplayLocation::TimelineHome.into(),
            country_code: query.country_code.clone(),
            request_id: query.request_id,
            subscription_level: query
                .subscription_level
                .map(sub_level_to_proto)
                .unwrap_or(SubscriptionLevel::LevelUnspecified)
                .into(),
            client_app_id: query.client_app_id as i64,
            counts: Some(Counts {
                retrieved_posts: fetched_posts as i32,
                retrieved_ads: fetched_ads as i32,
                response_posts: count_posts(items) as i32,
                response_ads: count_ads(items) as i32,
            }),
            ip_address: query.ip_address.clone(),
            user_agent: query.user_agent.clone(),
        };

        let bytes = timeline.encode_to_vec();
        self.kafka_client
            .send(&bytes)
            .await
            .map_err(|e| format!("Ads Kafka publish failed: {e}"))?;
        Ok(())
    }
}

Build the AdsInjectedTimeline proto. Embeds:

  • User identity.
  • Per-position entries.
  • Display location.
  • Counts (retrieved vs. response, posts vs. ads).
  • Subscription level (ad eligibility differs by tier).
  • IP, user agent (for compliance / fraud detection).

Note subscription_level defaults to LevelUnspecified if not known. Encoded to proto and sent.

fn build_timeline_entry(item: &FeedItem, position: usize) -> TimelineEntry {
    match &item.item {
        Some(feed_item::Item::Post(post)) => TimelineEntry {
            tweet_id: post.tweet_id,
            author_id: post.author_id,
            position: position as i32,
            promoted: false,
            impression_id: 0,
            brand_safety_verdict: post.brand_safety_verdict,
            ad_info: None,
            safety_labels: post.safety_label_types.clone(),
        },
        Some(feed_item::Item::Ad(ad)) => TimelineEntry {
            tweet_id: ad.post_id as u64,
            author_id: ad.author_id as u64,
            position: position as i32,
            promoted: true,
            impression_id: ad.impression_id as u64,
            brand_safety_verdict: 0,
            ad_info: Some(ad.clone()),
            safety_labels: vec![],
        },
        Some(feed_item::Item::WhoToFollow(_))
        | Some(feed_item::Item::Prompt(_))
        | Some(feed_item::Item::PushToHome(_))
        | None => TimelineEntry {
            position: position as i32,
            ..Default::default()
        },
    }
}

Item-type-specific entry building:

  • Post: full info including brand safety + safety labels.
  • Ad: promoted = true, includes ad_info for billing/attribution.
  • WhoToFollow / Prompt / PushToHome / None: empty entry, just the position. These items don't affect ad budget so they're just counted.
fn sub_level_to_proto(level: CoreSubscriptionLevel) -> SubscriptionLevel {
    match level {
        CoreSubscriptionLevel::Basic => SubscriptionLevel::Basic,
        CoreSubscriptionLevel::Premium => SubscriptionLevel::Premium,
        CoreSubscriptionLevel::PremiumPlus => SubscriptionLevel::PremiumPlus,
    }
}

fn count_posts(items: &[FeedItem]) -> usize {
    items
        .iter()
        .filter(|i| matches!(i.item, Some(feed_item::Item::Post(_))))
        .count()
}

fn count_ads(items: &[FeedItem]) -> usize {
    items
        .iter()
        .filter(|i| matches!(i.item, Some(feed_item::Item::Ad(_))))
        .count()
}

Helpers. The subscription-level mapping is from internal enum → proto enum (different crates).


for_you_response_stats_side_effect.rs (157 lines) — response counters

Emits a bunch of counters about every For You response: post count, ad count, "empty" rates, "sufficient" rates. Granular Prometheus dashboarding.

const RESPONSE_METRIC: &str = "ForYouFeed.response";
const TYPE_TOTAL: (&str, &str) = ("type", "total");
const TYPE_POSTS: (&str, &str) = ("type", "posts");
const TYPE_ADS: (&str, &str) = ("type", "ads");
const TYPE_EMPTY_ADS: (&str, &str) = ("type", "empty_ads");
const TYPE_EMPTY_POSTS: (&str, &str) = ("type", "empty_posts");
const TYPE_SUFFICIENT_ADS: (&str, &str) = ("type", "sufficient_ads");
const TYPE_SUFFICIENT_POSTS: (&str, &str) = ("type", "sufficient_posts");
const STAGE_RESPONSE: (&str, &str) = ("stage", "response");
const STAGE_FETCHED: (&str, &str) = ("stage", "fetched");

Const tag tuples. Two axes:

  • type: what's being counted (total / posts / ads / empty_ads / empty_posts / sufficient_ads / sufficient_posts).
  • stage: response (final) vs fetched (before filtering / blending).
pub struct ForYouResponseStatsSideEffect;

#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for ForYouResponseStatsSideEffect {
    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let fetched_posts =
            count_posts(&input.selected_candidates) + count_posts(&input.non_selected_candidates);
        let fetched_ads =
            count_ads(&input.selected_candidates) + count_ads(&input.non_selected_candidates);

        let query = &input.query;
        let blender_type = query.params.get(AdsBlenderType);

        stat_response(
            &input.selected_candidates,
            fetched_posts,
            fetched_ads,
            &query.country_code,
            query.subscription_level,
            &blender_type,
        );

        Ok(())
    }
}

Unit struct. Compute fetched counts (selected + non_selected). Delegate to stat_response.

No enable override — runs on every For You response.

fn stat_response(
    items: &[FeedItem],
    fetched_posts: usize,
    fetched_ads: usize,
    country_code: &str,
    subscription_level: Option<SubscriptionLevel>,
    blender_type: &str,
) {
    let post_count = items
        .iter()
        .filter(|i| matches!(i.item, Some(feed_item::Item::Post(_))))
        .count();
    let ad_count = items
        .iter()
        .filter(|i| matches!(i.item, Some(feed_item::Item::Ad(_))))
        .count();

    let sub_status = subscription_level.map(|s| s.as_str()).unwrap_or("none");

    info!(
        "ForYouFeed response - {post_count} posts, {ad_count} ads \
         (fetched {fetched_posts} posts, {fetched_ads} ads), \
         country {country_code}, subscription {sub_status}, blender {blender_type}.",
    );

Compute response post/ad counts. Log a human-readable line for debugging.

    if let Some(receiver) = global_stats_receiver() {
        let sub = ("subscription", sub_status);
        let blender = ("blender", blender_type);
        receiver.incr(RESPONSE_METRIC, &[TYPE_TOTAL, sub], 1);
        receiver.incr(
            RESPONSE_METRIC,
            &[TYPE_TOTAL, sub, ("country", bucket_country(country_code))],
            1,
        );
        receiver.incr(
            RESPONSE_METRIC,
            &[TYPE_POSTS, sub, blender],
            post_count as u64,
        );
        receiver.incr(RESPONSE_METRIC, &[TYPE_ADS, sub, blender], ad_count as u64);

Per-request counters tagged with subscription. Two extra dimensions:

  • country tag with bucketed country code. bucket_country probably groups small countries into "OTHER" to avoid label cardinality explosion in Prometheus.
  • blender tag — which ads-blending strategy was used.
        if ad_count == 0 {
            receiver.incr(
                RESPONSE_METRIC,
                &[TYPE_EMPTY_ADS, STAGE_RESPONSE, sub, blender],
                1,
            );
        }
        if post_count == 0 {
            receiver.incr(
                RESPONSE_METRIC,
                &[TYPE_EMPTY_POSTS, STAGE_RESPONSE, sub, blender],
                1,
            );
        }
        if ad_count >= 5 {
            receiver.incr(
                RESPONSE_METRIC,
                &[TYPE_SUFFICIENT_ADS, STAGE_RESPONSE, sub, blender],
                1,
            );
        }
        if post_count >= 20 {
            receiver.incr(
                RESPONSE_METRIC,
                &[TYPE_SUFFICIENT_POSTS, STAGE_RESPONSE, sub, blender],
                1,
            );
        }

Threshold-based counters for the response stage:

  • Empty ads / posts.
  • Sufficient ads (≥5) / sufficient posts (≥20).

The combination of total + empty + sufficient lets us compute rates like "% of responses with insufficient posts."

        if fetched_ads == 0 {
            receiver.incr(RESPONSE_METRIC, &[TYPE_EMPTY_ADS, STAGE_FETCHED, sub], 1);
        }
        if fetched_posts == 0 {
            receiver.incr(RESPONSE_METRIC, &[TYPE_EMPTY_POSTS, STAGE_FETCHED, sub], 1);
        }
        if fetched_ads >= 5 {
            receiver.incr(
                RESPONSE_METRIC,
                &[TYPE_SUFFICIENT_ADS, STAGE_FETCHED, sub],
                1,
            );
        }
        if fetched_posts >= 20 {
            receiver.incr(
                RESPONSE_METRIC,
                &[TYPE_SUFFICIENT_POSTS, STAGE_FETCHED, sub],
                1,
            );
        }
    }
}

Same set of counters but for the fetched stage (pre-blender). This lets us distinguish:

  • "Empty response because blender dropped them" (fetched_posts > 0 but post_count == 0).
  • "Empty response because nothing was fetched" (both zero).

Two failure modes; both monitorable.


reranking_kafka_side_effect.rs (157 lines) — 5% sampled Kafka log

Publishes a 5% sample of every Phoenix request to Kafka for offline analysis (model A/B comparison, training data augmentation).

const TOP_K: usize = 50;

pub struct RerankingKafkaSideEffect {
    kafka_client: Arc<dyn KafkaPublisherClient>,
}

impl RerankingKafkaSideEffect {
    pub fn new(kafka_client: Arc<dyn KafkaPublisherClient>) -> Self {
        Self { kafka_client }
    }
}

#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for RerankingKafkaSideEffect {
    fn enable(&self, _query: Arc<ScoredPostsQuery>) -> bool {
        is_prod() && random::<f64>() < 0.05
    }

5% random sampling. random::<f64>() < 0.05 — uniform random per request. Independent from B3 sampling and shadow-traffic flags.

_query — the enable check doesn't even look at the query, just the dice.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        let mut candidates: Vec<&PostCandidate> = Vec::new();
        for c in &input.selected_candidates {
            candidates.push(c);
        }
        for c in &input.non_selected_candidates {
            candidates.push(c);
        }

        let total_count = candidates.len() as i32;

        candidates.sort_by(|a, b| {
            let sa = a.score.unwrap_or(f64::MIN);
            let sb = b.score.unwrap_or(f64::MIN);
            sb.partial_cmp(&sa).unwrap_or(std::cmp::Ordering::Equal)
        });

        candidates.truncate(TOP_K);

        if candidates.is_empty() {
            return Ok(());
        }

Concatenate selected + non_selected. Sort by score descending. Take top 50. Bail if empty.

        let scored_candidates: Vec<pb::ScoredCandidate> = candidates
            .iter()
            .enumerate()
            .map(|(position, candidate)| build_scored_candidate(candidate, position as i32))
            .collect();

        let request_time_ms = input.query.request_time_ms;

        let prediction_request_id = candidates
            .iter()
            .find_map(|c| c.prediction_request_id.map(|id| id as i64));

        let product_surface = if input.query.in_network_only {
            xai_recsys_proto::ProductSurface::HomeTimelineRankedFollowing
        } else {
            xai_recsys_proto::ProductSurface::HomeTimelineRanking
        };

        let batch = pb::ScoredCandidateBatch {
            candidates: scored_candidates,
            viewer_id: Some(input.query.user_id),
            request_time_ms: Some(request_time_ms),
            prediction_request_id,
            served_request_id: prediction_request_id,
            served_id: prediction_request_id,
            total_candidates_count: Some(total_count),
            request_join_id: Some(input.query.request_id),
            product_surface: product_surface.into(),
        };

        let bytes = batch.encode_to_vec();
        self.kafka_client
            .send(&bytes)
            .await
            .map_err(|e| format!("Kafka publish failed: {e}"))
    }
}

Build a ScoredCandidateBatch proto. Three IDs that are all the same value (prediction_request_id) — prediction_request_id, served_request_id, served_id. The proto schema is over-general; in this case all three point to the same thing.

Plus request_join_id = request_id — for joining against the request-level log.

fn build_scored_candidate(candidate: &PostCandidate, position: i32) -> pb::ScoredCandidate {
    let s = &candidate.phoenix_scores;

    let mut prediction_scores: HashMap<String, f64> = HashMap::new();
    insert_score(&mut prediction_scores, "favorite", s.favorite_score);
    insert_score(&mut prediction_scores, "reply", s.reply_score);
    insert_score(&mut prediction_scores, "retweet", s.retweet_score);
    insert_score(&mut prediction_scores, "photo_expand", s.photo_expand_score);
    insert_score(&mut prediction_scores, "click", s.click_score);
    insert_score(&mut prediction_scores, "profile_click", s.profile_click_score);
    insert_score(&mut prediction_scores, "vqv", s.vqv_score);
    insert_score(&mut prediction_scores, "share", s.share_score);
    insert_score(&mut prediction_scores, "share_via_dm", s.share_via_dm_score);
    insert_score(&mut prediction_scores, "share_via_copy_link", s.share_via_copy_link_score);
    insert_score(&mut prediction_scores, "dwell", s.dwell_score);
    insert_score(&mut prediction_scores, "quote", s.quote_score);
    insert_score(&mut prediction_scores, "quoted_click", s.quoted_click_score);
    insert_score(&mut prediction_scores, "quoted_vqv", s.quoted_vqv_score);
    insert_score(&mut prediction_scores, "follow_author", s.follow_author_score);
    insert_score(&mut prediction_scores, "not_interested", s.not_interested_score);
    insert_score(&mut prediction_scores, "block_author", s.block_author_score);
    insert_score(&mut prediction_scores, "mute_author", s.mute_author_score);
    insert_score(&mut prediction_scores, "report", s.report_score);
    insert_score(&mut prediction_scores, "dwell_time", s.dwell_time);

Build a string→float map of all 20 prediction scores. The proto uses a flexible map so new score types can be added without schema changes.

    let source_tweet_id = candidate.retweeted_tweet_id.unwrap_or(candidate.tweet_id);

    let served_type = candidate.served_type.map(|st| format!("{:?}", st));

    pb::ScoredCandidate {
        tweet_id: candidate.tweet_id,
        score: candidate.score,
        prediction_scores,
        weighted_model_score: candidate.weighted_score,
        author_id: Some(candidate.author_id),
        source_tweet_id: Some(source_tweet_id),
        served_type,
        is_cached: candidate.last_scored_at_ms.is_some(),
        in_network: candidate.in_network.unwrap_or(false),
        position,
    }
}

fn insert_score(map: &mut HashMap<String, f64>, name: &str, value: Option<f64>) {
    map.insert(name.to_string(), value.unwrap_or(0.0));
}

Pack the candidate proto. Three score fields preserved (score, weighted_model_score, plus the per-action map).

served_type is debug-formatted to a string ("ForYouInNetwork" etc.) — the proto field is a string for flexibility.

is_cached: candidate.last_scored_at_ms.is_some() — heuristic for "was this candidate from the cache or freshly scored." last_scored_at_ms is set by PhoenixScorer, so its presence indicates fresh scoring. But the comment is reversed — actually is_cached = true when last_scored_at_ms.is_some() would mean "if recently scored, it's cached," which is backwards. Could be a bug, or could be that "cached" in this context means "has cached scoring metadata."

insert_score defaults None → 0.0 — consistent with the weighted score computation that treats None as 0.


Orphan: cache_request_info_side_effect.rs (42 lines)

pub struct CacheRequestInfoSideEffect {
    pub strato_client: Arc<dyn StratoClient + Send + Sync>,
}

#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for CacheRequestInfoSideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        env::var("APP_ENV").unwrap_or_default() == "prod" && !query.in_network_only
    }

    async fn run(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        let user_id: i64 = input.query.user_id;

        let post_ids: Vec<i64> = input
            .selected_candidates
            .iter()
            .map(|c| c.tweet_id)
            .collect();
        let client = &self.strato_client;
        let res = client
            .store_request_info(user_id, post_ids)
            .await
            .map_err(|e| e.to_string())?;
        let decoded: StratoResult<StratoValue<()>> = decode(&res);
        match decoded {
            StratoResult::Ok(_) => Ok(()),
            StratoResult::Err(_) => Err("error received from strato".to_string()),
        }
    }
}

Not in mod.rs — orphan. References the orphan crate::candidate_pipeline::query::ScoredPostsQuery from Session 06.

The intent was: store per-request (user, post_ids) info in Strato, presumably for downstream lookup. Almost certainly a pre-refactor version of RedisPostCandidateCacheSideEffect — single-call Strato store vs. compressed JSON in Redis with TTL.

Notable: uses env::var("APP_ENV") for the prod check instead of is_prod() — older pattern.

Also overrides run (the outer trait method) instead of side_effect (the inner one). Slightly unusual — most active side-effects override side_effect. This bypasses the stats macro instrumentation that the default run provides.


What we've learned

Side-effect anatomy: every side effect implements SideEffect<Q, C> with enable + side_effect (or run). Fires after the response. Failures don't break the response.

The Arc<Q> enable signature: side-effect enable takes Arc<Q> (not &Q like other stages) because side-effects are moved into Tokio tasks that may outlive the calling stack frame.

Common gates:

  • is_prod() — most side-effects skip in non-prod.
  • Feature flag (EnableXxx).
  • Per-request data check (e.g., !query.has_cached_posts, query.seen_ids.is_empty()).

Random sampling: RerankingKafkaSideEffect does random::<f64>() < 0.05 for 5% sampling. Independent from B3 traffic sampling.

Cross-DC writes: PhoenixRequestCacheSideEffect writes to two Redis clusters (atla + pdxa). Tolerates up to 10% errors before failing. Cross-DC analysis enabled.

Manhattan + Redis split:

  • Manhattan: update_past_request_timestamps, truncate_served_history. Durable per-user state.
  • Redis: redis_post_candidate_cache, phoenix_request_cache. Short-TTL cache.

Kafka cluster routing:

  • Bluebird — general (impressions).
  • Ads — ad-data only.
  • (Plus more we'll see in Session 13: Phoenix, Aiml.)

Two-stage metric tagging: for_you_response_stats_side_effect emits the same metric set with stage=response and stage=fetched tags. Lets us track blender attrition.

Threshold counters: instead of histograms, count "≥ N" and "= 0" cases as separate counters tagged with thresholds. Cheaper than histograms; rates computable via division.

Country bucketing: bucket_country(...) collapses long-tail countries into "OTHER" before tagging — avoids Prometheus label cardinality explosion.

Compression in cache writes: RedisPostCandidateCacheSideEffect uses zstd level 6 on JSON-encoded candidates. Run on spawn_blocking (CPU-bound). Debug log includes both sizes for compression-ratio visibility.

Caching pattern: write a superset of selected candidates (get_candidates_to_cache takes both selected + non_selected, sorts by weighted_score). The next request can pull a different top-K from the cached pool without re-running the model.

One pre-refactor orphan: cache_request_info_side_effect.rs — older Strato-based version of post-candidate caching.


Next session

Session 13 — Side effects, part 2. The 5 heavier ones (~1,200 LOC):

  • served_candidates_kafka_side_effect.rs (207) — published served set
  • update_served_history_side_effect.rs (268) — Manhattan write of served history
  • scored_stats_side_effect.rs (281) — per-candidate scoring metrics
  • client_events_kafka_side_effect.rs (302) — client-event Kafka publish
  • phoenix_experiments_side_effect.rs (174) — shadow-mode Phoenix variant runs