X For You algorithm, line by line · Part 13

X For You algorithm, line by line — Part 13: Side effects (part 2)

Part 13 of the deep dive into xai-org/x-algorithm. The five heaviest side-effects: shadow-mode multi-cluster Phoenix experiments, served-candidates Kafka publish, the multi-entry served-history Manhattan write, score-distribution + retrieval-position analytics, and the 302-line client-events firehose with cross-product event generation.

May 15, 2026·22 min read

The five remaining side-effects (~1,232 LOC). These are the heavy hitters — analytics deep-dives, the served-history write, the client-events firehose, and shadow-mode Phoenix experiment scoring.

Files covered:

home-mixer/side_effects/
├── phoenix_experiments_side_effect.rs        (174)  shadow-mode multi-cluster Phoenix
├── served_candidates_kafka_side_effect.rs    (207)  Phoenix-cluster Kafka publish
├── update_served_history_side_effect.rs      (268)  Manhattan write of served history
├── scored_stats_side_effect.rs               (281)  score-distribution + retrieval stats
└── client_events_kafka_side_effect.rs        (302)  client-event Kafka firehose

phoenix_experiments_side_effect.rs (174 lines) — shadow-mode multi-cluster scoring

The most expensive side-effect. Runs the candidate set against every shadow-eligible Phoenix cluster in parallel and publishes their scores to Kafka. This is how X measures "what would the alternative model have scored?" without affecting the live response.

pub struct PhoenixExperimentsSideEffect {
    phoenix_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
    egress_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
    kafka_client: Arc<dyn KafkaPublisherClient>,
}

Same two-client pattern as PhoenixScorer (Session 10) + a Kafka publisher.

#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for PhoenixExperimentsSideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        query.is_shadow_traffic
    }

Shadow traffic only. Recall from Session 04: is_shadow_traffic = is_sampled(request_id, 0.5) — deterministic 50% of requests. So half the production traffic runs this expensive side-effect.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        if input.query.scoring_sequence.is_none() {
            return Ok(());
        };

        let request_time_ms = input.query.request_time_ms;

        let product_surface = if input.query.in_network_only {
            ProductSurface::HomeTimelineRankedFollowing
        } else {
            ProductSurface::HomeTimelineRanking
        };

        let user_id = input.query.user_id;
        let use_egress: bool = input.query.params.get(UseEgressSidecar);

        let base_request =
            build_prediction_request(&input.query, &input.selected_candidates, product_surface);

Bail if no scoring sequence (no point shadow-scoring without input features).

Build the base request once — the same shape used by the primary scorer (Session 10). Reuse across multiple cluster calls.

        let futures = PhoenixCluster::VARIANTS
            .iter()
            .filter(|c| c.is_shadow_eligible())
            .map(|&cluster_id| {
                let client = if use_egress {
                    Arc::clone(&self.egress_client)
                } else {
                    Arc::clone(&self.phoenix_client)
                };
                let request = base_request.clone();
                async move {
                    let result = client.predict(cluster_id, request).await;
                    if let Err(ref err) = result {
                        tracing::error!(
                            "Phoenix experiment {:?} request failed: {}",
                            cluster_id,
                            err
                        );
                    }
                    (cluster_id, result)
                }
            });
        let experiment_results: Vec<_> = join_all(futures).await;

Iterate every Phoenix cluster variant (the proc-macro VariantArray exposes ::VARIANTS for the enum). Filter to shadow-eligible ones — some clusters are production-only and shouldn't be hit with shadow traffic (saves the cost).

For each, fan out a prediction call. Each clone of the request is ~50KB+ (the columnar sequence is the biggest piece), so this fan-out is expensive — N clusters × candidate-set-size of memory. Acceptable because it's only 50% of traffic, and the shadow workload is throttled below production capacity.

join_all resolves when all clusters returned. Errors are logged but don't abort — we want to publish what we got.

        for candidate in &input.selected_candidates {
            let mut all_scores: BTreeSet<Box<PredictionScore>> = BTreeSet::new();

            for (cluster_id, result) in &experiment_results {
                let Ok(predictions) = result else {
                    continue;
                };
                let cluster_name = format!("{:?}", cluster_id);
                let s = predictions.candidate_scores(&candidate.get_original_tweet_id());
                let score_fields: &[(&str, Option<f64>)] = &[
                    ("favorite", s.favorite_score),
                    ("reply", s.reply_score),
                    ("retweet", s.retweet_score),
                    ("photo_expand", s.photo_expand_score),
                    ("click", s.click_score),
                    ("profile_click", s.profile_click_score),
                    ("vqv", s.vqv_score),
                    ("share", s.share_score),
                    ("share_via_dm", s.share_via_dm_score),
                    ("share_via_copy_link", s.share_via_copy_link_score),
                    ("dwell", s.dwell_score),
                    ("quote", s.quote_score),
                    ("quoted_click", s.quoted_click_score),
                    ("quoted_vqv", s.quoted_vqv_score),
                    ("follow_author", s.follow_author_score),
                    ("not_interested", s.not_interested_score),
                    ("block_author", s.block_author_score),
                    ("mute_author", s.mute_author_score),
                    ("report", s.report_score),
                    ("not_dwelled", s.not_dwelled_score),
                    ("dwell_time", s.dwell_time),
                ];
                all_scores.extend(score_fields.iter().filter_map(|(name, score)| {
                    score.map(|v| {
                        Box::new(PredictionScore::new(
                            Some(format!("phoenix.{cluster_name}.{name}")),
                            Some(OrderedFloat::from(v)),
                        ))
                    })
                }));
            }

For each candidate, build a set of PredictionScore entries spanning all clusters × all action types. The score name is phoenix.<cluster_name>.<action_name> — e.g., phoenix.Experiment1Fou.favorite, phoenix.Experiment1Lap7.reply.

BTreeSet<Box<PredictionScore>> — ordered set. OrderedFloat wraps f64 for use as a map/set value (since f64 doesn't implement Ord by default — NaN handling).

The score.map(|v| Box::new(PredictionScore::new(...))) pattern: only include scores that aren't None. Missing scores are dropped from the set.

            let source_tweet_id = candidate.retweeted_tweet_id.unwrap_or(candidate.tweet_id);
            let scored = ScoredCandidate {
                tweet_id: candidate.tweet_id as i64,
                viewer_id: Some(user_id as i64),
                author_id: Some(candidate.author_id as i64),
                request_join_id: Some(input.query.request_id as i64),
                score: None,
                suggest_type: None,
                is_in_network: candidate.in_network,
                in_reply_to_tweet_id: candidate.in_reply_to_tweet_id.map(|id| id as i64),
                quoted_tweet_id: candidate.quoted_tweet_id.map(|id| id as i64),
                quoted_user_id: candidate.quoted_user_id.map(|id| id as i64),
                request_time_ms: Some(request_time_ms),
                source_tweet_id: Some(source_tweet_id as i64),
                prediction_scores: Some(all_scores),
                fav_count: candidate.fav_count,
                reply_count: candidate.reply_count,
                retweet_count: candidate.repost_count,
                quote_count: candidate.quote_count,
                has_media: candidate.has_media,
                language_code: candidate
                    .language_code
                    .as_deref()
                    .map(|lc| language_code_string_to_enum(lc) as i32),
                video_duration_ms: candidate.min_video_duration_ms,
            };

            match serialize_to_bytes_binary(&scored) {
                Ok(bytes) => {
                    if let Err(e) = self.kafka_client.send(&bytes).await {
                        tracing::error!("Failed to publish scored candidate to Kafka: {e}");
                    }
                }
                Err(e) => {
                    tracing::error!("Failed to serialize scored candidate: {e}");
                }
            }
        }

        Ok(())
    }
}

Build a ScoredCandidate Thrift struct with the per-candidate fields + all cluster scores. Serialize to binary Thrift, send to Kafka.

One Kafka send per candidate — N candidates × M clusters × 20 score fields gets logged for offline analysis (training data, A/B model comparison).

Errors logged but don't fail the whole side-effect — get what we can.


served_candidates_kafka_side_effect.rs (207 lines) — what we showed (Phoenix cluster)

Publishes the final served list to Kafka for downstream training data. Shadow-traffic only.

pub struct ServedCandidatesKafkaSideEffect {
    kafka_client: Arc<dyn KafkaPublisherClient>,
}

impl ServedCandidatesKafkaSideEffect {
    pub async fn prod() -> Self {
        Self::new(Arc::new(
            ProdKafkaPublisherClient::new(SERVED_CANDIDATES_TOPIC, KafkaCluster::Phoenix).await,
        ))
    }
}

Goes to the Phoenix Kafka cluster — different from the Bluebird (impressions) and Ads clusters. Training-data cluster.

    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        is_prod() && query.is_shadow_traffic && query.params.get(EnableUrtMigrationComponents)
    }

Three gates: prod, shadow traffic, URT-migration flag.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let query = &input.query;
        let items = &input.selected_candidates;
        if items.is_empty() {
            return Ok(());
        }

        let now_ms = query.request_time_ms;

        let request_provenance = match RequestContext::parse(&query.request_context) {
            RequestContext::Foreground => Some(Box::new(RequestProvenance::FOREGROUND)),
            RequestContext::Launch => Some(Box::new(RequestProvenance::LAUNCH)),
            RequestContext::PullToRefresh => Some(Box::new(RequestProvenance::PTR)),
            RequestContext::ViewportAwareRefresh => {
                Some(Box::new(RequestProvenance::VIEWPORT_AWARE_REFRESH))
            }
            _ => Some(Box::new(RequestProvenance::OTHER)),
        };

        let query_type = if query.is_bottom_request {
            Some(Box::new(QueryType::GET_OLDER))
        } else if query.is_top_request {
            Some(Box::new(QueryType::GET_NEWER))
        } else if query.cursor.is_none() {
            Some(Box::new(QueryType::GET_INITIAL))
        } else {
            Some(Box::new(QueryType::OTHER))
        };

Classify the request:

  • Provenance: how the user got here (foreground / launch / pull-to-refresh / viewport-aware-refresh / other).
  • Query type: what kind of fetch (older / newer / initial / other).

Both important for training-data labeling — different request types have different engagement patterns.

        let request_info = Box::new(RequestInfo {
            request_time_ms: now_ms,
            trace_id: query.request_id as i64,
            user_id: Some(query.user_id as i64),
            client_app_id: Some(query.client_app_id as i64),
            timeline_type: Some(Box::new(TimelineType::HOME)),
            ip_address: Some(query.ip_address.clone()),
            user_agent: Some(query.user_agent.clone()),
            query_type,
            request_provenance,
            language_code: Some(query.language_code.clone()),
            country_code: Some(query.country_code.clone()),
            request_end_time_ms: Some(now_ms),
            request_join_id: Some(query.request_id as i64),
        });

Build the shared per-request envelope. trace_id and request_join_id both = query.request_id — same value used for two purposes (Zipkin trace + downstream join key).

        let mut payloads = Vec::with_capacity(items.len());
        for item in items {
            if let Some(entry_info) = build_entry_info(item) {
                let served = ServedEntry {
                    request: request_info.clone(),
                    entry: Some(Box::new(entry_info)),
                };
                let bytes = serialize_compact(&served)
                    .map_err(|e| format!("Thrift serialization failed: {e}"))?;
                payloads.push(bytes);
            }
        }

        let futs: Vec<_> = payloads
            .iter()
            .map(|bytes| self.kafka_client.send(bytes))
            .collect();
        let results = futures::future::join_all(futs).await;

        if let Some(err) = results.into_iter().find_map(|r| r.err()) {
            return Err(format!("Served-candidates Kafka publish failed: {err}"));
        }

        Ok(())
    }
}

One Kafka message per item. Each carries the same request_info plus item-specific entry_info. Compact Thrift encoding (more efficient than binary for repeated structures).

Sends fire in parallel. Any error fails the whole side-effect.

serialize_compact (vs. serialize_binary we've seen) — Thrift's compact protocol uses variable-length integer encoding for ~30% size savings.

fn build_entry_info(item: &FeedItem) -> Option<EntryInfo> {
    match &item.item {
        Some(feed_item::Item::Post(post)) => {
            let source_tweet_id = if post.retweeted_tweet_id != 0 {
                Some(post.retweeted_tweet_id as i64)
            } else {
                None
            };
            Some(EntryInfo {
                id: post.tweet_id as i64,
                position: item.position as i16,
                entry_id: format!("tweet-{}", post.tweet_id),
                entry_type: Box::new(EntryType::TWEET),
                vertical_size: Some(1),
                sort_index: None,
                display_type: None,
                score: Some(OrderedFloat::from(post.score as f64)),
                details: Some(Box::new(ItemDetails::TweetDetails(Box::new(
                    TweetDetails {
                        source_tweet_id,
                        author_id: Some(post.author_id as i64),
                    },
                )))),
                prediction_scores: None,
            })
        }
        Some(feed_item::Item::Ad(ad)) => Some(EntryInfo {
            id: ad.post_id,
            position: item.position as i16,
            entry_id: format!("promotedTweet-{}", ad.post_id),
            entry_type: Box::new(EntryType::PROMOTED_TWEET),
            vertical_size: Some(1),
            sort_index: None,
            display_type: None,
            score: None,
            details: Some(Box::new(ItemDetails::PromotedTweetDetails(Box::new(
                PromotedTweetDetails {
                    advertiser_id: Some(ad.account_id),
                    insert_position: Some(ad.insert_position),
                    impression_id: Some(ad.impression_id.to_string()),
                },
            )))),
            prediction_scores: None,
        }),
        Some(feed_item::Item::WhoToFollow(_)) => Some(EntryInfo {
            id: 0,
            position: item.position as i16,
            entry_id: "who-to-follow".to_string(),
            entry_type: Box::new(EntryType::WHO_TO_FOLLOW_MODULE),
            vertical_size: None,
            sort_index: None,
            display_type: None,
            score: None,
            details: Some(Box::new(ItemDetails::WhoToFollowDetails(Box::new(
                WhoToFollowDetails {
                    advertiser_id: None,
                },
            )))),
            prediction_scores: None,
        }),
        Some(feed_item::Item::PushToHome(pth)) => Some(EntryInfo {
            id: pth.tweet_id as i64,
            position: item.position as i16,
            entry_id: format!("tweet-{}", pth.tweet_id),
            entry_type: Box::new(EntryType::TWEET),
            vertical_size: Some(1),
            sort_index: None,
            display_type: None,
            score: None,
            details: Some(Box::new(ItemDetails::TweetDetails(Box::new(
                TweetDetails {
                    source_tweet_id: None,
                    author_id: Some(pth.author_id as i64),
                },
            )))),
            prediction_scores: None,
        }),
        Some(feed_item::Item::Prompt(_)) => Some(EntryInfo {
            id: 0,
            position: item.position as i16,
            entry_id: "prompt".to_string(),
            entry_type: Box::new(EntryType::PROMPT),
            vertical_size: None,
            sort_index: None,
            display_type: None,
            score: None,
            details: None,
            prediction_scores: None,
        }),
        None => None,
    }
}

Big match on FeedItem variants. Each builds an EntryInfo with type-specific details:

  • Post: tweet_id, source_tweet_id (for retweets), author, score.
  • Ad: promoted_tweet_id, advertiser_id, insert_position, impression_id.
  • WhoToFollow: a module-level entry (no specific user).
  • PushToHome: like a post but with score = None (PTH doesn't have a Phoenix score).
  • Prompt: minimal entry.

entry_id prefixes — tweet-, promotedTweet-, etc. — for human-readable join keys.


update_served_history_side_effect.rs (268 lines) — Manhattan write

Writes the served items to Manhattan so the next request's ServedHistoryQueryHydrator can read them back (Session 09). Closes the loop on served-history tracking.

pub struct UpdateServedHistorySideEffect {
    client: Arc<dyn ServedHistoryClient>,
}

#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for UpdateServedHistorySideEffect {
    fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
        is_prod() && query.params.get(EnableUrtMigrationComponents)
    }

Prod + URT flag. Same as TruncateServedHistorySideEffect from Session 12 (they share the same client).

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let items = &input.selected_candidates;
        if items.is_empty() {
            return Ok(());
        }

        let query = &input.query;
        let now_ms = query.request_time_ms;

        let entries: Vec<EntryWithItemIds> = items
            .iter()
            .enumerate()
            .flat_map(|(position, item)| build_entries(item, position as i64))
            .collect();

        if entries.is_empty() {
            return Ok(());
        }

Build entries with flat_map because each FeedItem can produce multiple entries — particularly for posts in conversation threads (one entry per ancestor).

        let history = ServedHistory {
            request_type: request_type(query),
            entries,
            served_id: None,
            served_time_ms: None,
        };

        let platform = client_platform::from_client_app_id(query.client_app_id);
        self.client
            .put(
                query.user_id,
                TimelineType::Home,
                platform,
                now_ms,
                &history,
            )
            .await
            .map_err(|e| e.to_string())
    }
}

Wrap in ServedHistory (with classification fields left as None; the client fills them on write). Write per (user, timeline, platform, time, history).

fn request_type(query: &ScoredPostsQuery) -> RequestType {
    if query.is_polling {
        return RequestType::POLLING;
    }
    match query.cursor.as_ref() {
        None => RequestType::INITIAL,
        Some(c) => match c.cursor_type.as_ref() {
            Some(ct) if *ct == CursorType::TOP => RequestType::NEWER,
            Some(ct) if *ct == CursorType::BOTTOM => RequestType::OLDER,
            Some(ct) if *ct == CursorType::GAP => RequestType::MIDDLE,
            _ => RequestType::OTHER,
        },
    }
}

Map query state → RequestType enum:

  • Polling → POLLING.
  • No cursor → INITIAL.
  • Cursor TOPNEWER (loading newer posts).
  • Cursor BOTTOMOLDER (loading older posts).
  • Cursor GAPMIDDLE (filling a gap, e.g., when "Show new posts" reveals).
  • Anything else → OTHER.
fn build_entries(feed_item: &FeedItem, position: i64) -> Vec<EntryWithItemIds> {
    match feed_item.item.as_ref() {
        Some(FeedItemKind::Post(post)) => build_post_entries(post, position),
        Some(FeedItemKind::Ad(ad)) => vec![build_ad_entry(ad, position)],
        Some(FeedItemKind::WhoToFollow(wtf)) => {
            build_wtf_entry(wtf, position).into_iter().collect()
        }
        Some(FeedItemKind::Prompt(_)) => vec![build_prompt_entry(position)],
        Some(FeedItemKind::PushToHome(pth)) => build_push_to_home_entries(pth, position),
        None => vec![],
    }
}

Top-level dispatch. wtf.into_iter().collect() flattens an Option<EntryWithItemIds> into a Vec (returns empty for None).

fn build_post_entries(post: &ScoredPost, position: i64) -> Vec<EntryWithItemIds> {
    if post.ancestors.is_empty() {
        vec![build_tweet_entry(post, post.tweet_id, position)]
    } else {
        let mut tweet_ids: Vec<u64> = post.ancestors.clone();
        tweet_ids.push(post.tweet_id);
        tweet_ids.sort_unstable();
        tweet_ids
            .into_iter()
            .map(|tweet_id| build_tweet_entry(post, tweet_id, position))
            .collect()
    }
}

Multi-entry for replies: if a post has ancestors (a reply), produce one entry per (ancestor + self), sorted. Each ancestor's served-history entry uses the same position — they're all at the same feed slot.

Why? Because served history is used by PreviouslySeenPostsFilter to mark posts as "user already saw this." If the user saw a reply chain, they effectively saw the root + all the in-between posts — so we mark all of them as served. Prevents showing the same conversation from a different angle.

fn nonzero(v: u64) -> Option<i64> {
    if v != 0 { Some(v as i64) } else { None }
}

fn build_tweet_entry(post: &ScoredPost, tweet_id: u64, position: i64) -> EntryWithItemIds {
    let source_tweet_id = nonzero(post.retweeted_tweet_id);
    let source_author_id = Some(nonzero(post.retweeted_user_id).unwrap_or(post.author_id as i64));
    let in_reply_to_tweet_id = nonzero(post.in_reply_to_tweet_id);
    let prediction_request_id = nonzero(post.prediction_request_id);

    let served_type_name = upper_snake_to_pascal(
        ServedType::try_from(post.served_type)
            .unwrap_or(ServedType::Undefined)
            .as_str_name(),
    );

    let tweet_score = TweetScore::TweetScoreV1(TweetScoreV1 {
        score: OrderedFloat::from(post.score as f64),
        served_type: Some(served_type_name),
        debug_info: None,
        prediction_request_id,
        topics: None,
        tags: None,
        predicted_scores: None,
    });

    EntryWithItemIds {
        entity_type: EntityIdType::TWEET,
        sort_index: Some(position),
        size: None,
        item_ids: Some(vec![ItemIds {
            tweet_id: Some(tweet_id as i64),
            source_tweet_id,
            quote_tweet_id: None,
            source_author_id,
            quote_author_id: None,
            in_reply_to_tweet_id,
            in_reply_to_author_id: None,
            article_id: None,
            tweet_score: Some(tweet_score),
            entry_id_to_replace: None,
            user_id: None,
            impression_id: None,
        }]),
    }
}

Build the served-history entry. nonzero helper: converts 0 sentinel to None. Same Proto3-pattern handling we've seen.

served_type_name — convert enum to "PascalCase" for the served-history record. upper_snake_to_pascal("FOR_YOU_PHOENIX_RETRIEVAL")"ForYouPhoenixRetrieval".

TweetScoreV1 wraps the actual score + metadata. The variant naming (V1) suggests there's a TweetScoreV2 planned (forward-compat schema).

fn build_ad_entry(ad: &AdIndexInfo, position: i64) -> EntryWithItemIds {
    let impression_string = format!("{:x}", ad.impression_id);
    EntryWithItemIds {
        entity_type: EntityIdType::PROMOTED_TWEET,
        sort_index: Some(position),
        size: None,
        item_ids: Some(vec![ItemIds {
            tweet_id: Some(ad.post_id),
            source_tweet_id: None,
            quote_tweet_id: None,
            source_author_id: Some(ad.author_id),
            quote_author_id: None,
            in_reply_to_tweet_id: None,
            in_reply_to_author_id: None,
            article_id: None,
            tweet_score: None,
            entry_id_to_replace: None,
            user_id: None,
            impression_id: Some(impression_string),
        }]),
    }
}

Ad entry: impression_id is hex-encoded ({:x} format). Most other IDs are decimal — but impression IDs are conventionally hex strings in ad systems.

fn build_prompt_entry(position: i64) -> EntryWithItemIds {
    EntryWithItemIds {
        entity_type: EntityIdType::PROMPT,
        sort_index: Some(position),
        size: None,
        item_ids: None,
    }
}

Prompts get a minimal entry — just the type + position. No item_ids since prompts aren't keyed by tweet.

fn build_push_to_home_entries(pth: &PushToHomePost, position: i64) -> Vec<EntryWithItemIds> {
    let in_reply_to_tweet_id = nonzero(pth.in_reply_to_tweet_id);

    let served_type_name = upper_snake_to_pascal(
        ServedType::try_from(pth.served_type)
            .unwrap_or(ServedType::Undefined)
            .as_str_name(),
    );

    let tweet_score = TweetScore::TweetScoreV1(TweetScoreV1 {
        score: OrderedFloat::from(0.0),
        served_type: Some(served_type_name),
        debug_info: None,
        prediction_request_id: None,
        topics: None,
        tags: None,
        predicted_scores: None,
    });

    vec![EntryWithItemIds {
        entity_type: EntityIdType::TWEET,
        sort_index: Some(position),
        size: None,
        item_ids: Some(vec![ItemIds {
            tweet_id: Some(pth.tweet_id as i64),
            source_tweet_id: None,
            quote_tweet_id: None,
            source_author_id: Some(pth.author_id as i64),
            quote_author_id: None,
            in_reply_to_tweet_id,
            in_reply_to_author_id: None,
            article_id: None,
            tweet_score: Some(tweet_score),
            entry_id_to_replace: None,
            user_id: None,
            impression_id: None,
        }]),
    }]
}

PTH entry treated as a TWEET entity (so the next request's filter knows the user saw it). Score is 0.0 — PTH posts don't have Phoenix scores. served_type_name is "ForYouPushToHome" to preserve provenance.

fn build_wtf_entry(wtf: &WhoToFollowModule, position: i64) -> Option<EntryWithItemIds> {
    let resp = wtf.who_to_follow_response.as_ref()?;
    if resp.user_recommendations.is_empty() {
        return None;
    }
    let size = resp.user_recommendations.len() as i16;
    let item_ids: Vec<ItemIds> = resp
        .user_recommendations
        .iter()
        .map(|rec| ItemIds {
            tweet_id: None,
            source_tweet_id: None,
            quote_tweet_id: None,
            source_author_id: None,
            quote_author_id: None,
            in_reply_to_tweet_id: None,
            in_reply_to_author_id: None,
            article_id: None,
            tweet_score: None,
            entry_id_to_replace: None,
            user_id: Some(rec.user_id),
            impression_id: None,
        })
        .collect();
    Some(EntryWithItemIds {
        entity_type: EntityIdType::WHO_TO_FOLLOW,
        sort_index: Some(position),
        size: Some(size),
        item_ids: Some(item_ids),
    })
}

WTF entry has one ItemId per recommended user. The user IDs become the served-history of "we recommended these users to this viewer" — feeds into WhoToFollowSource's get_excluded_user_ids (Session 11) which filters them out of future recommendations.

size = resp.user_recommendations.len() — explicitly logs how many users were in the module.


scored_stats_side_effect.rs (281 lines) — Phoenix score analytics

Per-action score distributions + Phoenix retrieval position analytics. The richest stats side-effect.

const METRIC_PREFIX: &str = "ScoredStats";
const PRESENT_SCOPE: [(&str, &str); 1] = [("score_status", "present")];
const MISSING_SCOPE: [(&str, &str); 1] = [("score_status", "missing")];
const HEAVY_RANKER_TOP_K: &[usize] = &[1, 10, 35];
const DEFAULT_SAMPLING_RATE: f64 = 0.05;

5% sampling + three top-K positions to track (1 / 10 / 35).

pub struct ScoredStatsSideEffect;

#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for ScoredStatsSideEffect {
    fn enable(&self, _query: Arc<ScoredPostsQuery>) -> bool {
        true
    }

Always enabled. But the body samples down to 5% internally.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
    ) -> Result<(), String> {
        let Some(receiver) = global_stats_receiver() else {
            return Ok(());
        };

        let candidates = &input.selected_candidates;
        if candidates.is_empty() {
            return Ok(());
        }

        if !input.query.has_cached_posts {
            let retrieval_cluster: String =
                input.query.params.get(PhoenixRetrievalInferenceClusterId);
            let moe_cluster: String = input
                .query
                .params
                .get(PhoenixRetrievalMOEInferenceClusterId);
            let experiment_buckets = input
                .query
                .params
                .experiment_buckets(EnablePhoenixRetrievalStatsExperimentBucket);

            if !experiment_buckets.is_empty() || random::<f64>() < DEFAULT_SAMPLING_RATE {
                record_score_distributions(receiver.as_ref(), candidates);
                record_phoenix_retrieval_stats(...);
                record_phoenix_retrieval_moe_stats(...);
            }
        } else {
            if random::<f64>() < DEFAULT_SAMPLING_RATE {
                record_score_distributions(receiver.as_ref(), candidates);
            }
        }

        Ok(())
    }
}

Two branches:

  • Fresh scoring (!has_cached_posts): if in any experiment bucket OR random 5%, record full stats (distributions + Phoenix retrieval + MoE retrieval).
  • Cached posts: random 5% — only record score distributions (no retrieval to report on).

Experiment buckets always sample at 100%: experiment_buckets(...) returns non-empty when the user is in any active experiment. Sampled requests from experiments get unconditional logging.

fn record_head(
    receiver: &dyn StatsReceiverExt,
    name: &str,
    scores: impl Iterator<Item = Option<f64>>,
) {
    let distribution_key = format!("{METRIC_PREFIX}.scoreDistribution.{name}");
    let missing_key = format!("{METRIC_PREFIX}.scoreMissing.{name}");
    let mut present = 0u64;
    let mut missing = 0u64;
    for score in scores {
        match score {
            Some(value) => {
                present += 1;
                receiver.observe(&distribution_key, &[], value, HistogramBuckets::Bucket0To1);
            }
            None => {
                missing += 1;
            }
        }
    }
    receiver.incr(&missing_key, &PRESENT_SCOPE, present);
    receiver.incr(&missing_key, &MISSING_SCOPE, missing);
}

For each score "head" (action type), emit:

  • Distribution histogram: each present score observed in [0, 1] bucket.
  • Present/missing counters: how many candidates had / lacked this score.

Lets dashboards plot e.g. "what's the distribution of favorite_score predictions today" + "what % of candidates have a favorite_score."

fn record_score_distributions(receiver: &dyn StatsReceiverExt, candidates: &[PostCandidate]) {
    record_head(
        receiver,
        "favorite",
        candidates.iter().map(|c| c.phoenix_scores.favorite_score),
    );
    record_head(
        receiver,
        "reply",
        candidates.iter().map(|c| c.phoenix_scores.reply_score),
    );
    record_head(
        receiver,
        "retweet",
        candidates.iter().map(|c| c.phoenix_scores.retweet_score),
    );
    record_head(
        receiver,
        "click",
        candidates.iter().map(|c| c.phoenix_scores.click_score),
    );
    record_head(
        receiver,
        "vqv",
        candidates.iter().map(|c| c.phoenix_scores.vqv_score),
    );
    record_head(
        receiver,
        "share",
        candidates.iter().map(|c| c.phoenix_scores.share_score),
    );
    record_head(
        receiver,
        "not_interested",
        candidates
            .iter()
            .map(|c| c.phoenix_scores.not_interested_score),
    );
    record_head(
        receiver,
        "weightedScore",
        candidates.iter().map(|c| c.weighted_score),
    );
    record_head(receiver, "finalScore", candidates.iter().map(|c| c.score));
}

Track 7 of the 22 Phoenix scores plus the two aggregated scores (weightedScore, finalScore). The seven selected are the most-important actions for ranking — saves cost on the others.

fn record_retrieval_source_stats(
    receiver: &dyn StatsReceiverExt,
    selected_candidates: &[PostCandidate],
    non_selected_candidates: &[PostCandidate],
    retrieval_cluster: &str,
    experiment_buckets: &[&ExperimentBucket],
    served_type: ServedType,
    metric_name: &str,
) {
    let mut all_candidates: Vec<&PostCandidate> = selected_candidates
        .iter()
        .chain(non_selected_candidates.iter())
        .collect();
    all_candidates.sort_by(|a, b| {
        let score_a = a.weighted_score.unwrap_or(f64::NEG_INFINITY);
        let score_b = b.weighted_score.unwrap_or(f64::NEG_INFINITY);
        score_b
            .partial_cmp(&score_a)
            .unwrap_or(std::cmp::Ordering::Equal)
    });

Sort ALL candidates (selected + non_selected) by weighted_score descending. The Phoenix retrieval source's position in this ranked list is what we'll measure.

    let empty_bucket = ExperimentBucket::new("", "");
    let buckets: &[&ExperimentBucket] = if experiment_buckets.is_empty() {
        &[&empty_bucket]
    } else {
        experiment_buckets
    };

If no experiment buckets, use a single "empty" placeholder. Lets the loop below work uniformly.

    let topk_key = format!("{METRIC_PREFIX}.{metric_name}.RankedTopK");
    for &k in HEAVY_RANKER_TOP_K {
        let count = all_candidates
            .iter()
            .take(k)
            .filter(|c| c.served_type == Some(served_type))
            .count();
        let k_str = match k {
            1 => "1",
            10 => "10",
            35 => "35",
            _ => "unknown",
        };

        for b in buckets {
            let scopes: [(&str, &str); 5] = [
                ("type", "sum"),
                ("retrieval_cluster", retrieval_cluster),
                ("k", k_str),
                ("ddg", &b.experiment),
                ("bucket", &b.bucket),
            ];
            receiver.incr(&topk_key, &scopes, count as u64);
            let req_scopes: [(&str, &str); 5] = [
                ("type", "requests"),
                ("retrieval_cluster", retrieval_cluster),
                ("k", k_str),
                ("ddg", &b.experiment),
                ("bucket", &b.bucket),
            ];
            receiver.incr(&topk_key, &req_scopes, 1);
        }
    }

Position-of-source metric: for each k ∈ {1, 10, 35}, count how many of the top-K post-ranking candidates came from this retrieval source. Plus a counter for the number of requests (used to compute the average via sum/requests on the dashboard side).

Each metric is tagged with experiment + bucket, so dashboards can break down "Phoenix retrieval contribution to top-1 in experiment X bucket Y."

ddg = Decider/Decision Group (legacy name for experiment).

    let served_key = format!("{METRIC_PREFIX}.{metric_name}.Served");
    let count = selected_candidates
        .iter()
        .filter(|c| c.served_type == Some(served_type))
        .count();

    for b in buckets {
        receiver.incr(
            &served_key,
            &[
                ("type", "sum"),
                ("retrieval_cluster", retrieval_cluster),
                ("ddg", &b.experiment),
                ("bucket", &b.bucket),
            ],
            count as u64,
        );
        receiver.incr(
            &served_key,
            &[
                ("type", "requests"),
                ("retrieval_cluster", retrieval_cluster),
                ("ddg", &b.experiment),
                ("bucket", &b.bucket),
            ],
            1,
        );
    }
}

Same sum/requests pattern for the final served set (not top-K of all candidates — only what made the response). Lets dashboards compute "fraction of served items from this retrieval source."

Together these tell the story: how much retrieval-source contribution at top-1 (signal of strength) vs in the served set (signal of impact).


client_events_kafka_side_effect.rs (302 lines) — client-event firehose

The longest side-effect. Builds dozens of "client-event"-flavored log entries and publishes each to Kafka. Drives the cross-product analytics dashboards.

pub struct ClientEventsKafkaSideEffect {
    kafka_client: Arc<dyn KafkaPublisherClient>,
}

impl ClientEventsKafkaSideEffect {
    pub async fn prod() -> Self {
        Self::new(Arc::new(
            ProdKafkaPublisherClient::new(CLIENT_EVENT_TOPIC, KafkaCluster::ClientEvents).await,
        ))
    }
}

Yet another Kafka cluster: ClientEvents. So far we've seen Bluebird, Ads, Phoenix, ClientEvents. Each has its own retention / fanout / downstream consumer set.

    async fn side_effect(
        &self,
        input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
    ) -> Result<(), String> {
        let query = &input.query;
        let items = &input.selected_candidates;

        let posts: Vec<&ScoredPost> = items
            .iter()
            .filter_map(|i| match &i.item {
                Some(feed_item::Item::Post(p)) => Some(p),
                _ => None,
            })
            .collect();
        let ad_count = items
            .iter()
            .filter(|i| matches!(i.item, Some(feed_item::Item::Ad(_))))
            .count() as i64;
        let wtf_count = items
            .iter()
            .filter(|i| matches!(i.item, Some(feed_item::Item::WhoToFollow(_))))
            .count() as i64;
        let post_count = posts.len() as i64;

Count posts, ads, WTF modules.

        let base = ClientEventParams {
            query,
            client_name: ClientPlatform::from_app_id(query.client_app_id).client_name(),
            section: "home",
            component: None,
            element: None,
            action: "served_tweets",
            value: 0,
        };

        let mut events = Vec::new();
        events.extend(build_served_events(&base, post_count, ad_count, wtf_count));
        events.extend(build_tweet_type_events(&base, &posts));
        events.extend(build_served_type_events(&base, &posts));
        events.extend(build_video_events(&base, &posts));
        events.extend(build_empty_timeline_events(&base, post_count));
        events.extend(build_query_events(&base, post_count));

Build a base ClientEventParams struct that all events share. Then build six categories of events, each producing several log lines. Total can be 50+ events per request.

        let payloads: Vec<Vec<u8>> = events
            .iter()
            .filter(|e| e.event_value.unwrap_or(0) > 0)
            .map(|e| serialize_binary(e).map_err(|e| format!("Thrift serialization failed: {e}")))
            .collect::<Result<_, _>>()?;

        let futs: Vec<_> = payloads
            .iter()
            .map(|bytes| self.kafka_client.send(bytes))
            .collect();
        let results = futures::future::join_all(futs).await;

        if let Some(err) = results.into_iter().find_map(|r| r.err()) {
            return Err(format!("Client-event Kafka publish failed: {err}"));
        }

        Ok(())
    }
}

Filter out zero-valued events (no point publishing "served 0 video posts"). Serialize each, send all in parallel, surface first error.

collect::<Result<_, _>>()? — collects an iterator of Results into a Result<Vec<_>, _>. Stops at first error.

Event-category builders

fn build_served_events(
    base: &ClientEventParams,
    post_count: i64,
    ad_count: i64,
    wtf_count: i64,
) -> Vec<LogEvent> {
    vec![
        build_log_event(&ClientEventParams {
            value: post_count + ad_count,
            ..*base
        }),
        build_log_event(&ClientEventParams {
            component: Some("injected"),
            value: post_count,
            ..*base
        }),
        build_log_event(&ClientEventParams {
            component: Some("promoted"),
            value: ad_count,
            ..*base
        }),
        build_log_event(&ClientEventParams {
            component: Some("who_to_follow"),
            action: "served_users",
            value: wtf_count,
            ..*base
        }),
    ]
}

Four counter-events:

  • Total served (posts + ads).
  • "injected" posts (organic).
  • "promoted" ads.
  • "who_to_follow" users.

The ..*base is struct update syntax — copy all fields from base except those explicitly overridden.

fn build_tweet_type_events(base: &ClientEventParams, posts: &[&ScoredPost]) -> Vec<LogEvent> {
    TWEET_TYPE_PREDICATES
        .iter()
        .map(|&(bit_idx, name)| {
            let count = posts
                .iter()
                .filter(|p| bitset_get(&p.tweet_type_metrics, bit_idx))
                .count();
            build_log_event(&ClientEventParams {
                component: Some("injected"),
                element: Some(name),
                value: count as i64,
                ..*base
            })
        })
        .collect()
}

For each tweet-type predicate from Session 08's TweetTypeMetricsHydrator (the bit indices like RETWEET, REPLY, AUTHOR_FOLLOWERS_0_100, etc.), count how many served posts have that bit set, emit as an event with element = predicate_name.

So we get e.g. home.injected.RETWEET.served_tweets = 5 if 5 retweets were served. Detailed slicing.

fn build_served_type_events(base: &ClientEventParams, posts: &[&ScoredPost]) -> Vec<LogEvent> {
    let mut counts: HashMap<i32, i64> = HashMap::new();
    for post in posts {
        *counts.entry(post.served_type).or_default() += 1;
    }
    counts
        .into_iter()
        .map(|(st, count)| {
            let name = ServedType::try_from(st)
                .map(|t| t.as_str_name().to_ascii_lowercase())
                .unwrap_or_else(|_| format!("unknown_{st}"));
            base_build_log_event(base, Some(name), None, base.action, count)
        })
        .collect()
}

Group posts by served_type, emit one event per type. So "5 from thunder, 12 from phoenix_retrieval, 3 from tweet_mixer."

The enum-name → lowercase: "FOR_YOU_PHOENIX_RETRIEVAL""for_you_phoenix_retrieval". Different from the served-history upper_snake_to_pascal — different downstream consumer convention.

fn build_video_events(base: &ClientEventParams, posts: &[&ScoredPost]) -> Vec<LogEvent> {
    let num_videos = posts
        .iter()
        .filter(|p| bitset_get(&p.tweet_type_metrics, VIDEO))
        .count();
    vec![build_log_event(&ClientEventParams {
        component: Some("with_video_duration"),
        element: Some("num_videos"),
        value: num_videos as i64,
        ..*base
    })]
}

Video-specific event. Could be folded into build_tweet_type_events but kept separate (presumably for backward-compat with a specific dashboard).

fn build_empty_timeline_events(base: &ClientEventParams, post_count: i64) -> Vec<LogEvent> {
    if post_count > 0 {
        return Vec::new();
    }
    vec![build_log_event(&ClientEventParams {
        action: "empty",
        element: Some("served_non_promoted_tweet"),
        ..*base
    })]
}

Conditional event — emit only when posts count is 0. Lets us track empty-response rates separately.

fn build_query_events(base: &ClientEventParams, post_count: i64) -> Vec<LogEvent> {
    let query = base.query;
    let ctx = RequestContext::parse(&query.request_context);

    let query_predicates: Vec<(&str, bool)> = vec![
        ("request", true),
        ("get_older", query.is_bottom_request),
        ("get_newer", query.is_top_request),
        ("get_initial", query.cursor.is_none()),
        ("pull_to_refresh", ctx == RequestContext::PullToRefresh),
        ("request_context_launch", ctx == RequestContext::Launch),
        (
            "request_context_foreground",
            ctx == RequestContext::Foreground,
        ),
    ];

    let size_predicates: Vec<(&str, bool)> = vec![
        ("size_is_empty", post_count == 0),
        ("size_at_most_5", post_count <= 5),
        ("size_at_most_10", post_count <= 10),
        ("size_at_most_35", post_count <= 35),
    ];

    let mut events = Vec::new();
    for &(query_name, query_match) in &query_predicates {
        if !query_match {
            continue;
        }
        for &(size_name, size_match) in &size_predicates {
            if !size_match {
                continue;
            }
            events.push(build_log_event(&ClientEventParams {
                component: Some(size_name),
                action: query_name,
                element: None,
                value: 1,
                ..*base
            }));
        }
    }
    events
}

The cross-product: 7 query types × 4 size buckets = up to 28 events. But filtered to matching predicates only. For e.g. a get_initial request with 7 posts:

  • (get_initial, size_at_most_10): emit
  • (get_initial, size_at_most_35): emit
  • (get_initial, size_at_most_5): skip (7 > 5)
  • (get_initial, size_is_empty): skip

("request", true) always fires — so we always get (request, size_xxx) events. The "request" predicate is the baseline counter.

#[derive(Clone, Copy)]
struct ClientEventParams<'a> {
    query: &'a ScoredPostsQuery,
    client_name: &'a str,
    section: &'a str,
    component: Option<&'a str>,
    element: Option<&'a str>,
    action: &'a str,
    value: i64,
}

The shared params struct. #[derive(Clone, Copy)] on a struct with references — works because all fields are either Copy types or references (which are Copy). Lets us do ..*base everywhere.

fn build_log_event(p: &ClientEventParams) -> LogEvent {
    base_build_log_event(
        p,
        p.component.map(String::from),
        p.element.map(String::from),
        p.action,
        p.value,
    )
}

fn base_build_log_event(
    p: &ClientEventParams,
    component: Option<String>,
    element: Option<String>,
    action: &str,
    value: i64,
) -> LogEvent {
    let now_ms = p.query.request_time_ms;

    let event_name = [
        p.client_name,
        "home",
        p.section,
        component.as_deref().unwrap_or(""),
        element.as_deref().unwrap_or(""),
        action,
    ]
    .join(":");

    LogEvent {
        log_base: Some(LogBase {
            transaction_id: String::new(),
            ip_address: p.query.ip_address.clone(),
            user_id: Some(p.query.user_id as i64),
            timestamp: now_ms,
            language: Some(p.query.language_code.clone()),
            client_app_id: Some(p.query.client_app_id as i64),
            device_id: Some(p.query.device_id.clone()),
            country: Some(p.query.country_code.clone()),
            timezone: None,
        }),
        event_name: Some(event_name),
        event_value: Some(value),
        event_details: None,
        event_namespace: Some(EventNamespace {
            client: Some(p.client_name.to_string()),
            page: Some("home".to_string()),
            section: Some(p.section.to_string()),
            component,
            element,
            action: Some(action.to_string()),
        }),
        notification_details: None,
    }
}

Build the final LogEvent proto. event_name is the canonical colon-separated string: "iphone:home:home:injected::served_tweets". event_namespace has the same data in structured form. Both — old consumers expect the string, new ones use the namespace.

Empty components/elements show as empty strings in the joined name (:::). Convention is to keep them present in the name for parsability.

The LogBase block contains user identification + locale. This is X's universal "who made this request" envelope.


What we've learned

Side-effect side of the pipeline — the data feeds:

  1. Training data: ServedCandidatesKafka (Phoenix cluster), RerankingKafka (5% sampled), PhoenixExperiments (all shadow-eligible clusters).
  2. Analytics: ScoredStatsSideEffect (score distributions + retrieval position), ForYouResponseStatsSideEffect, ClientEventsKafka (firehose).
  3. State writes for next request: UpdateServedHistory (Manhattan), UpdatePastRequestTimestamps (Manhattan), RedisPostCandidateCache (Redis), PhoenixRequestCache (cross-DC Redis).
  4. Compliance/billing: AdsInjectionLogging (Ads cluster), PublishSeenIdsToKafka (impressions).
  5. Maintenance: TruncateServedHistory, MutualFollowStats.

Five distinct Kafka clusters in use:

  • Bluebird — general impressions.
  • Ads — ad attribution / billing.
  • Phoenix — training data (served candidates, scored candidates).
  • Aiml — ML pipeline data (Phoenix predictions).
  • ClientEvents — analytics firehose.

Shadow-traffic patterns:

  • is_shadow_traffic = is_sampled(request_id, 0.5) from Session 04.
  • PhoenixExperiments and ServedCandidatesKafka and (Session 08) some context hydrators gate on this.
  • Independent from B3 trace sampling.
  • Independent from random < 0.05 (5% sampling).

Sample-rate stacks:

  • Shadow traffic: 50% of requests.
  • RerankingKafka / ScoredStats: 5% of those.
  • Combined: ~2.5% of total traffic gets full analytics.

Two ID conventions:

  • query.request_id → used as trace_id AND request_join_id in published events.
  • query.prediction_id → used as Phoenix prediction request ID. Different value, different downstream use.

Multi-entry served-history for replies: a single served reply produces N entries (one per ancestor + self) so that "user saw the conversation" is fully recorded for dedup.

Cross-product event emission: build_query_events emits up to 28 events from 7×4 predicates. Lets dashboards slice "P99 latency for get_initial requests with size_at_most_5" via two-dimensional Prometheus queries.

Empty-bucket placeholder: record_retrieval_source_stats uses an ExperimentBucket::new("", "") placeholder when no experiments are active. Lets the same loop body work whether or not experiments are happening — uniform code path.

Sum/requests pattern: emit both ("type", "sum") and ("type", "requests") counters. Compute averages on the dashboard side via rate(sum) / rate(requests). Avoids histograms when only the mean is wanted.

Forward-compat schema markers: TweetScoreV1 enum variant. Implies V2 planned. Schema evolution in published events is a Thrift-style versioned-variant problem.

Per-message Kafka publish vs. batching: most side-effects send one Kafka message per item — N posts → N sends in parallel. No batching. Trade-off: lower latency, more network traffic. Works because the Kafka client likely has internal batching.


Next session

Session 14 — Ads (533 LOC). The home-mixer/ads/ module:

  • safe_gap_blender.rs (95) — keeps ads N positions apart from each other
  • partition_organic_blender.rs (190) — alternative blending strategy with brand-safety partitioning
  • util.rs (228) — shared ad-positioning helpers
  • mod.rs (20) — re-exports + the AdsBlender trait

After Session 14, we leave the Rust home-mixer and dive into the Python Phoenix model code.