X For You algorithm, line by line · Part 13
X For You algorithm, line by line — Part 13: Side effects (part 2)
Part 13 of the deep dive into xai-org/x-algorithm. The five heaviest side-effects: shadow-mode multi-cluster Phoenix experiments, served-candidates Kafka publish, the multi-entry served-history Manhattan write, score-distribution + retrieval-position analytics, and the 302-line client-events firehose with cross-product event generation.
The five remaining side-effects (~1,232 LOC). These are the heavy hitters — analytics deep-dives, the served-history write, the client-events firehose, and shadow-mode Phoenix experiment scoring.
Files covered:
home-mixer/side_effects/
├── phoenix_experiments_side_effect.rs (174) shadow-mode multi-cluster Phoenix
├── served_candidates_kafka_side_effect.rs (207) Phoenix-cluster Kafka publish
├── update_served_history_side_effect.rs (268) Manhattan write of served history
├── scored_stats_side_effect.rs (281) score-distribution + retrieval stats
└── client_events_kafka_side_effect.rs (302) client-event Kafka firehose
phoenix_experiments_side_effect.rs (174 lines) — shadow-mode multi-cluster scoring
The most expensive side-effect. Runs the candidate set against every shadow-eligible Phoenix cluster in parallel and publishes their scores to Kafka. This is how X measures "what would the alternative model have scored?" without affecting the live response.
pub struct PhoenixExperimentsSideEffect {
phoenix_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
egress_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
kafka_client: Arc<dyn KafkaPublisherClient>,
}
Same two-client pattern as PhoenixScorer (Session 10) + a Kafka publisher.
#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for PhoenixExperimentsSideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
query.is_shadow_traffic
}
Shadow traffic only. Recall from Session 04: is_shadow_traffic = is_sampled(request_id, 0.5) — deterministic 50% of requests. So half the production traffic runs this expensive side-effect.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
if input.query.scoring_sequence.is_none() {
return Ok(());
};
let request_time_ms = input.query.request_time_ms;
let product_surface = if input.query.in_network_only {
ProductSurface::HomeTimelineRankedFollowing
} else {
ProductSurface::HomeTimelineRanking
};
let user_id = input.query.user_id;
let use_egress: bool = input.query.params.get(UseEgressSidecar);
let base_request =
build_prediction_request(&input.query, &input.selected_candidates, product_surface);
Bail if no scoring sequence (no point shadow-scoring without input features).
Build the base request once — the same shape used by the primary scorer (Session 10). Reuse across multiple cluster calls.
let futures = PhoenixCluster::VARIANTS
.iter()
.filter(|c| c.is_shadow_eligible())
.map(|&cluster_id| {
let client = if use_egress {
Arc::clone(&self.egress_client)
} else {
Arc::clone(&self.phoenix_client)
};
let request = base_request.clone();
async move {
let result = client.predict(cluster_id, request).await;
if let Err(ref err) = result {
tracing::error!(
"Phoenix experiment {:?} request failed: {}",
cluster_id,
err
);
}
(cluster_id, result)
}
});
let experiment_results: Vec<_> = join_all(futures).await;
Iterate every Phoenix cluster variant (the proc-macro VariantArray exposes ::VARIANTS for the enum). Filter to shadow-eligible ones — some clusters are production-only and shouldn't be hit with shadow traffic (saves the cost).
For each, fan out a prediction call. Each clone of the request is ~50KB+ (the columnar sequence is the biggest piece), so this fan-out is expensive — N clusters × candidate-set-size of memory. Acceptable because it's only 50% of traffic, and the shadow workload is throttled below production capacity.
join_all resolves when all clusters returned. Errors are logged but don't abort — we want to publish what we got.
for candidate in &input.selected_candidates {
let mut all_scores: BTreeSet<Box<PredictionScore>> = BTreeSet::new();
for (cluster_id, result) in &experiment_results {
let Ok(predictions) = result else {
continue;
};
let cluster_name = format!("{:?}", cluster_id);
let s = predictions.candidate_scores(&candidate.get_original_tweet_id());
let score_fields: &[(&str, Option<f64>)] = &[
("favorite", s.favorite_score),
("reply", s.reply_score),
("retweet", s.retweet_score),
("photo_expand", s.photo_expand_score),
("click", s.click_score),
("profile_click", s.profile_click_score),
("vqv", s.vqv_score),
("share", s.share_score),
("share_via_dm", s.share_via_dm_score),
("share_via_copy_link", s.share_via_copy_link_score),
("dwell", s.dwell_score),
("quote", s.quote_score),
("quoted_click", s.quoted_click_score),
("quoted_vqv", s.quoted_vqv_score),
("follow_author", s.follow_author_score),
("not_interested", s.not_interested_score),
("block_author", s.block_author_score),
("mute_author", s.mute_author_score),
("report", s.report_score),
("not_dwelled", s.not_dwelled_score),
("dwell_time", s.dwell_time),
];
all_scores.extend(score_fields.iter().filter_map(|(name, score)| {
score.map(|v| {
Box::new(PredictionScore::new(
Some(format!("phoenix.{cluster_name}.{name}")),
Some(OrderedFloat::from(v)),
))
})
}));
}
For each candidate, build a set of PredictionScore entries spanning all clusters × all action types. The score name is phoenix.<cluster_name>.<action_name> — e.g., phoenix.Experiment1Fou.favorite, phoenix.Experiment1Lap7.reply.
BTreeSet<Box<PredictionScore>> — ordered set. OrderedFloat wraps f64 for use as a map/set value (since f64 doesn't implement Ord by default — NaN handling).
The score.map(|v| Box::new(PredictionScore::new(...))) pattern: only include scores that aren't None. Missing scores are dropped from the set.
let source_tweet_id = candidate.retweeted_tweet_id.unwrap_or(candidate.tweet_id);
let scored = ScoredCandidate {
tweet_id: candidate.tweet_id as i64,
viewer_id: Some(user_id as i64),
author_id: Some(candidate.author_id as i64),
request_join_id: Some(input.query.request_id as i64),
score: None,
suggest_type: None,
is_in_network: candidate.in_network,
in_reply_to_tweet_id: candidate.in_reply_to_tweet_id.map(|id| id as i64),
quoted_tweet_id: candidate.quoted_tweet_id.map(|id| id as i64),
quoted_user_id: candidate.quoted_user_id.map(|id| id as i64),
request_time_ms: Some(request_time_ms),
source_tweet_id: Some(source_tweet_id as i64),
prediction_scores: Some(all_scores),
fav_count: candidate.fav_count,
reply_count: candidate.reply_count,
retweet_count: candidate.repost_count,
quote_count: candidate.quote_count,
has_media: candidate.has_media,
language_code: candidate
.language_code
.as_deref()
.map(|lc| language_code_string_to_enum(lc) as i32),
video_duration_ms: candidate.min_video_duration_ms,
};
match serialize_to_bytes_binary(&scored) {
Ok(bytes) => {
if let Err(e) = self.kafka_client.send(&bytes).await {
tracing::error!("Failed to publish scored candidate to Kafka: {e}");
}
}
Err(e) => {
tracing::error!("Failed to serialize scored candidate: {e}");
}
}
}
Ok(())
}
}
Build a ScoredCandidate Thrift struct with the per-candidate fields + all cluster scores. Serialize to binary Thrift, send to Kafka.
One Kafka send per candidate — N candidates × M clusters × 20 score fields gets logged for offline analysis (training data, A/B model comparison).
Errors logged but don't fail the whole side-effect — get what we can.
served_candidates_kafka_side_effect.rs (207 lines) — what we showed (Phoenix cluster)
Publishes the final served list to Kafka for downstream training data. Shadow-traffic only.
pub struct ServedCandidatesKafkaSideEffect {
kafka_client: Arc<dyn KafkaPublisherClient>,
}
impl ServedCandidatesKafkaSideEffect {
pub async fn prod() -> Self {
Self::new(Arc::new(
ProdKafkaPublisherClient::new(SERVED_CANDIDATES_TOPIC, KafkaCluster::Phoenix).await,
))
}
}
Goes to the Phoenix Kafka cluster — different from the Bluebird (impressions) and Ads clusters. Training-data cluster.
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
is_prod() && query.is_shadow_traffic && query.params.get(EnableUrtMigrationComponents)
}
Three gates: prod, shadow traffic, URT-migration flag.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let query = &input.query;
let items = &input.selected_candidates;
if items.is_empty() {
return Ok(());
}
let now_ms = query.request_time_ms;
let request_provenance = match RequestContext::parse(&query.request_context) {
RequestContext::Foreground => Some(Box::new(RequestProvenance::FOREGROUND)),
RequestContext::Launch => Some(Box::new(RequestProvenance::LAUNCH)),
RequestContext::PullToRefresh => Some(Box::new(RequestProvenance::PTR)),
RequestContext::ViewportAwareRefresh => {
Some(Box::new(RequestProvenance::VIEWPORT_AWARE_REFRESH))
}
_ => Some(Box::new(RequestProvenance::OTHER)),
};
let query_type = if query.is_bottom_request {
Some(Box::new(QueryType::GET_OLDER))
} else if query.is_top_request {
Some(Box::new(QueryType::GET_NEWER))
} else if query.cursor.is_none() {
Some(Box::new(QueryType::GET_INITIAL))
} else {
Some(Box::new(QueryType::OTHER))
};
Classify the request:
- Provenance: how the user got here (foreground / launch / pull-to-refresh / viewport-aware-refresh / other).
- Query type: what kind of fetch (older / newer / initial / other).
Both important for training-data labeling — different request types have different engagement patterns.
let request_info = Box::new(RequestInfo {
request_time_ms: now_ms,
trace_id: query.request_id as i64,
user_id: Some(query.user_id as i64),
client_app_id: Some(query.client_app_id as i64),
timeline_type: Some(Box::new(TimelineType::HOME)),
ip_address: Some(query.ip_address.clone()),
user_agent: Some(query.user_agent.clone()),
query_type,
request_provenance,
language_code: Some(query.language_code.clone()),
country_code: Some(query.country_code.clone()),
request_end_time_ms: Some(now_ms),
request_join_id: Some(query.request_id as i64),
});
Build the shared per-request envelope. trace_id and request_join_id both = query.request_id — same value used for two purposes (Zipkin trace + downstream join key).
let mut payloads = Vec::with_capacity(items.len());
for item in items {
if let Some(entry_info) = build_entry_info(item) {
let served = ServedEntry {
request: request_info.clone(),
entry: Some(Box::new(entry_info)),
};
let bytes = serialize_compact(&served)
.map_err(|e| format!("Thrift serialization failed: {e}"))?;
payloads.push(bytes);
}
}
let futs: Vec<_> = payloads
.iter()
.map(|bytes| self.kafka_client.send(bytes))
.collect();
let results = futures::future::join_all(futs).await;
if let Some(err) = results.into_iter().find_map(|r| r.err()) {
return Err(format!("Served-candidates Kafka publish failed: {err}"));
}
Ok(())
}
}
One Kafka message per item. Each carries the same request_info plus item-specific entry_info. Compact Thrift encoding (more efficient than binary for repeated structures).
Sends fire in parallel. Any error fails the whole side-effect.
serialize_compact (vs. serialize_binary we've seen) — Thrift's compact protocol uses variable-length integer encoding for ~30% size savings.
fn build_entry_info(item: &FeedItem) -> Option<EntryInfo> {
match &item.item {
Some(feed_item::Item::Post(post)) => {
let source_tweet_id = if post.retweeted_tweet_id != 0 {
Some(post.retweeted_tweet_id as i64)
} else {
None
};
Some(EntryInfo {
id: post.tweet_id as i64,
position: item.position as i16,
entry_id: format!("tweet-{}", post.tweet_id),
entry_type: Box::new(EntryType::TWEET),
vertical_size: Some(1),
sort_index: None,
display_type: None,
score: Some(OrderedFloat::from(post.score as f64)),
details: Some(Box::new(ItemDetails::TweetDetails(Box::new(
TweetDetails {
source_tweet_id,
author_id: Some(post.author_id as i64),
},
)))),
prediction_scores: None,
})
}
Some(feed_item::Item::Ad(ad)) => Some(EntryInfo {
id: ad.post_id,
position: item.position as i16,
entry_id: format!("promotedTweet-{}", ad.post_id),
entry_type: Box::new(EntryType::PROMOTED_TWEET),
vertical_size: Some(1),
sort_index: None,
display_type: None,
score: None,
details: Some(Box::new(ItemDetails::PromotedTweetDetails(Box::new(
PromotedTweetDetails {
advertiser_id: Some(ad.account_id),
insert_position: Some(ad.insert_position),
impression_id: Some(ad.impression_id.to_string()),
},
)))),
prediction_scores: None,
}),
Some(feed_item::Item::WhoToFollow(_)) => Some(EntryInfo {
id: 0,
position: item.position as i16,
entry_id: "who-to-follow".to_string(),
entry_type: Box::new(EntryType::WHO_TO_FOLLOW_MODULE),
vertical_size: None,
sort_index: None,
display_type: None,
score: None,
details: Some(Box::new(ItemDetails::WhoToFollowDetails(Box::new(
WhoToFollowDetails {
advertiser_id: None,
},
)))),
prediction_scores: None,
}),
Some(feed_item::Item::PushToHome(pth)) => Some(EntryInfo {
id: pth.tweet_id as i64,
position: item.position as i16,
entry_id: format!("tweet-{}", pth.tweet_id),
entry_type: Box::new(EntryType::TWEET),
vertical_size: Some(1),
sort_index: None,
display_type: None,
score: None,
details: Some(Box::new(ItemDetails::TweetDetails(Box::new(
TweetDetails {
source_tweet_id: None,
author_id: Some(pth.author_id as i64),
},
)))),
prediction_scores: None,
}),
Some(feed_item::Item::Prompt(_)) => Some(EntryInfo {
id: 0,
position: item.position as i16,
entry_id: "prompt".to_string(),
entry_type: Box::new(EntryType::PROMPT),
vertical_size: None,
sort_index: None,
display_type: None,
score: None,
details: None,
prediction_scores: None,
}),
None => None,
}
}
Big match on FeedItem variants. Each builds an EntryInfo with type-specific details:
- Post: tweet_id, source_tweet_id (for retweets), author, score.
- Ad: promoted_tweet_id, advertiser_id, insert_position, impression_id.
- WhoToFollow: a module-level entry (no specific user).
- PushToHome: like a post but with
score = None(PTH doesn't have a Phoenix score). - Prompt: minimal entry.
entry_id prefixes — tweet-, promotedTweet-, etc. — for human-readable join keys.
update_served_history_side_effect.rs (268 lines) — Manhattan write
Writes the served items to Manhattan so the next request's ServedHistoryQueryHydrator can read them back (Session 09). Closes the loop on served-history tracking.
pub struct UpdateServedHistorySideEffect {
client: Arc<dyn ServedHistoryClient>,
}
#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for UpdateServedHistorySideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
is_prod() && query.params.get(EnableUrtMigrationComponents)
}
Prod + URT flag. Same as TruncateServedHistorySideEffect from Session 12 (they share the same client).
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let items = &input.selected_candidates;
if items.is_empty() {
return Ok(());
}
let query = &input.query;
let now_ms = query.request_time_ms;
let entries: Vec<EntryWithItemIds> = items
.iter()
.enumerate()
.flat_map(|(position, item)| build_entries(item, position as i64))
.collect();
if entries.is_empty() {
return Ok(());
}
Build entries with flat_map because each FeedItem can produce multiple entries — particularly for posts in conversation threads (one entry per ancestor).
let history = ServedHistory {
request_type: request_type(query),
entries,
served_id: None,
served_time_ms: None,
};
let platform = client_platform::from_client_app_id(query.client_app_id);
self.client
.put(
query.user_id,
TimelineType::Home,
platform,
now_ms,
&history,
)
.await
.map_err(|e| e.to_string())
}
}
Wrap in ServedHistory (with classification fields left as None; the client fills them on write). Write per (user, timeline, platform, time, history).
fn request_type(query: &ScoredPostsQuery) -> RequestType {
if query.is_polling {
return RequestType::POLLING;
}
match query.cursor.as_ref() {
None => RequestType::INITIAL,
Some(c) => match c.cursor_type.as_ref() {
Some(ct) if *ct == CursorType::TOP => RequestType::NEWER,
Some(ct) if *ct == CursorType::BOTTOM => RequestType::OLDER,
Some(ct) if *ct == CursorType::GAP => RequestType::MIDDLE,
_ => RequestType::OTHER,
},
}
}
Map query state → RequestType enum:
- Polling →
POLLING. - No cursor →
INITIAL. - Cursor
TOP→NEWER(loading newer posts). - Cursor
BOTTOM→OLDER(loading older posts). - Cursor
GAP→MIDDLE(filling a gap, e.g., when "Show new posts" reveals). - Anything else →
OTHER.
fn build_entries(feed_item: &FeedItem, position: i64) -> Vec<EntryWithItemIds> {
match feed_item.item.as_ref() {
Some(FeedItemKind::Post(post)) => build_post_entries(post, position),
Some(FeedItemKind::Ad(ad)) => vec![build_ad_entry(ad, position)],
Some(FeedItemKind::WhoToFollow(wtf)) => {
build_wtf_entry(wtf, position).into_iter().collect()
}
Some(FeedItemKind::Prompt(_)) => vec![build_prompt_entry(position)],
Some(FeedItemKind::PushToHome(pth)) => build_push_to_home_entries(pth, position),
None => vec![],
}
}
Top-level dispatch. wtf.into_iter().collect() flattens an Option<EntryWithItemIds> into a Vec (returns empty for None).
fn build_post_entries(post: &ScoredPost, position: i64) -> Vec<EntryWithItemIds> {
if post.ancestors.is_empty() {
vec![build_tweet_entry(post, post.tweet_id, position)]
} else {
let mut tweet_ids: Vec<u64> = post.ancestors.clone();
tweet_ids.push(post.tweet_id);
tweet_ids.sort_unstable();
tweet_ids
.into_iter()
.map(|tweet_id| build_tweet_entry(post, tweet_id, position))
.collect()
}
}
Multi-entry for replies: if a post has ancestors (a reply), produce one entry per (ancestor + self), sorted. Each ancestor's served-history entry uses the same position — they're all at the same feed slot.
Why? Because served history is used by PreviouslySeenPostsFilter to mark posts as "user already saw this." If the user saw a reply chain, they effectively saw the root + all the in-between posts — so we mark all of them as served. Prevents showing the same conversation from a different angle.
fn nonzero(v: u64) -> Option<i64> {
if v != 0 { Some(v as i64) } else { None }
}
fn build_tweet_entry(post: &ScoredPost, tweet_id: u64, position: i64) -> EntryWithItemIds {
let source_tweet_id = nonzero(post.retweeted_tweet_id);
let source_author_id = Some(nonzero(post.retweeted_user_id).unwrap_or(post.author_id as i64));
let in_reply_to_tweet_id = nonzero(post.in_reply_to_tweet_id);
let prediction_request_id = nonzero(post.prediction_request_id);
let served_type_name = upper_snake_to_pascal(
ServedType::try_from(post.served_type)
.unwrap_or(ServedType::Undefined)
.as_str_name(),
);
let tweet_score = TweetScore::TweetScoreV1(TweetScoreV1 {
score: OrderedFloat::from(post.score as f64),
served_type: Some(served_type_name),
debug_info: None,
prediction_request_id,
topics: None,
tags: None,
predicted_scores: None,
});
EntryWithItemIds {
entity_type: EntityIdType::TWEET,
sort_index: Some(position),
size: None,
item_ids: Some(vec![ItemIds {
tweet_id: Some(tweet_id as i64),
source_tweet_id,
quote_tweet_id: None,
source_author_id,
quote_author_id: None,
in_reply_to_tweet_id,
in_reply_to_author_id: None,
article_id: None,
tweet_score: Some(tweet_score),
entry_id_to_replace: None,
user_id: None,
impression_id: None,
}]),
}
}
Build the served-history entry. nonzero helper: converts 0 sentinel to None. Same Proto3-pattern handling we've seen.
served_type_name — convert enum to "PascalCase" for the served-history record. upper_snake_to_pascal("FOR_YOU_PHOENIX_RETRIEVAL") → "ForYouPhoenixRetrieval".
TweetScoreV1 wraps the actual score + metadata. The variant naming (V1) suggests there's a TweetScoreV2 planned (forward-compat schema).
fn build_ad_entry(ad: &AdIndexInfo, position: i64) -> EntryWithItemIds {
let impression_string = format!("{:x}", ad.impression_id);
EntryWithItemIds {
entity_type: EntityIdType::PROMOTED_TWEET,
sort_index: Some(position),
size: None,
item_ids: Some(vec![ItemIds {
tweet_id: Some(ad.post_id),
source_tweet_id: None,
quote_tweet_id: None,
source_author_id: Some(ad.author_id),
quote_author_id: None,
in_reply_to_tweet_id: None,
in_reply_to_author_id: None,
article_id: None,
tweet_score: None,
entry_id_to_replace: None,
user_id: None,
impression_id: Some(impression_string),
}]),
}
}
Ad entry: impression_id is hex-encoded ({:x} format). Most other IDs are decimal — but impression IDs are conventionally hex strings in ad systems.
fn build_prompt_entry(position: i64) -> EntryWithItemIds {
EntryWithItemIds {
entity_type: EntityIdType::PROMPT,
sort_index: Some(position),
size: None,
item_ids: None,
}
}
Prompts get a minimal entry — just the type + position. No item_ids since prompts aren't keyed by tweet.
fn build_push_to_home_entries(pth: &PushToHomePost, position: i64) -> Vec<EntryWithItemIds> {
let in_reply_to_tweet_id = nonzero(pth.in_reply_to_tweet_id);
let served_type_name = upper_snake_to_pascal(
ServedType::try_from(pth.served_type)
.unwrap_or(ServedType::Undefined)
.as_str_name(),
);
let tweet_score = TweetScore::TweetScoreV1(TweetScoreV1 {
score: OrderedFloat::from(0.0),
served_type: Some(served_type_name),
debug_info: None,
prediction_request_id: None,
topics: None,
tags: None,
predicted_scores: None,
});
vec![EntryWithItemIds {
entity_type: EntityIdType::TWEET,
sort_index: Some(position),
size: None,
item_ids: Some(vec![ItemIds {
tweet_id: Some(pth.tweet_id as i64),
source_tweet_id: None,
quote_tweet_id: None,
source_author_id: Some(pth.author_id as i64),
quote_author_id: None,
in_reply_to_tweet_id,
in_reply_to_author_id: None,
article_id: None,
tweet_score: Some(tweet_score),
entry_id_to_replace: None,
user_id: None,
impression_id: None,
}]),
}]
}
PTH entry treated as a TWEET entity (so the next request's filter knows the user saw it). Score is 0.0 — PTH posts don't have Phoenix scores. served_type_name is "ForYouPushToHome" to preserve provenance.
fn build_wtf_entry(wtf: &WhoToFollowModule, position: i64) -> Option<EntryWithItemIds> {
let resp = wtf.who_to_follow_response.as_ref()?;
if resp.user_recommendations.is_empty() {
return None;
}
let size = resp.user_recommendations.len() as i16;
let item_ids: Vec<ItemIds> = resp
.user_recommendations
.iter()
.map(|rec| ItemIds {
tweet_id: None,
source_tweet_id: None,
quote_tweet_id: None,
source_author_id: None,
quote_author_id: None,
in_reply_to_tweet_id: None,
in_reply_to_author_id: None,
article_id: None,
tweet_score: None,
entry_id_to_replace: None,
user_id: Some(rec.user_id),
impression_id: None,
})
.collect();
Some(EntryWithItemIds {
entity_type: EntityIdType::WHO_TO_FOLLOW,
sort_index: Some(position),
size: Some(size),
item_ids: Some(item_ids),
})
}
WTF entry has one ItemId per recommended user. The user IDs become the served-history of "we recommended these users to this viewer" — feeds into WhoToFollowSource's get_excluded_user_ids (Session 11) which filters them out of future recommendations.
size = resp.user_recommendations.len() — explicitly logs how many users were in the module.
scored_stats_side_effect.rs (281 lines) — Phoenix score analytics
Per-action score distributions + Phoenix retrieval position analytics. The richest stats side-effect.
const METRIC_PREFIX: &str = "ScoredStats";
const PRESENT_SCOPE: [(&str, &str); 1] = [("score_status", "present")];
const MISSING_SCOPE: [(&str, &str); 1] = [("score_status", "missing")];
const HEAVY_RANKER_TOP_K: &[usize] = &[1, 10, 35];
const DEFAULT_SAMPLING_RATE: f64 = 0.05;
5% sampling + three top-K positions to track (1 / 10 / 35).
pub struct ScoredStatsSideEffect;
#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for ScoredStatsSideEffect {
fn enable(&self, _query: Arc<ScoredPostsQuery>) -> bool {
true
}
Always enabled. But the body samples down to 5% internally.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
let Some(receiver) = global_stats_receiver() else {
return Ok(());
};
let candidates = &input.selected_candidates;
if candidates.is_empty() {
return Ok(());
}
if !input.query.has_cached_posts {
let retrieval_cluster: String =
input.query.params.get(PhoenixRetrievalInferenceClusterId);
let moe_cluster: String = input
.query
.params
.get(PhoenixRetrievalMOEInferenceClusterId);
let experiment_buckets = input
.query
.params
.experiment_buckets(EnablePhoenixRetrievalStatsExperimentBucket);
if !experiment_buckets.is_empty() || random::<f64>() < DEFAULT_SAMPLING_RATE {
record_score_distributions(receiver.as_ref(), candidates);
record_phoenix_retrieval_stats(...);
record_phoenix_retrieval_moe_stats(...);
}
} else {
if random::<f64>() < DEFAULT_SAMPLING_RATE {
record_score_distributions(receiver.as_ref(), candidates);
}
}
Ok(())
}
}
Two branches:
- Fresh scoring (
!has_cached_posts): if in any experiment bucket OR random 5%, record full stats (distributions + Phoenix retrieval + MoE retrieval). - Cached posts: random 5% — only record score distributions (no retrieval to report on).
Experiment buckets always sample at 100%: experiment_buckets(...) returns non-empty when the user is in any active experiment. Sampled requests from experiments get unconditional logging.
fn record_head(
receiver: &dyn StatsReceiverExt,
name: &str,
scores: impl Iterator<Item = Option<f64>>,
) {
let distribution_key = format!("{METRIC_PREFIX}.scoreDistribution.{name}");
let missing_key = format!("{METRIC_PREFIX}.scoreMissing.{name}");
let mut present = 0u64;
let mut missing = 0u64;
for score in scores {
match score {
Some(value) => {
present += 1;
receiver.observe(&distribution_key, &[], value, HistogramBuckets::Bucket0To1);
}
None => {
missing += 1;
}
}
}
receiver.incr(&missing_key, &PRESENT_SCOPE, present);
receiver.incr(&missing_key, &MISSING_SCOPE, missing);
}
For each score "head" (action type), emit:
- Distribution histogram: each present score observed in
[0, 1]bucket. - Present/missing counters: how many candidates had / lacked this score.
Lets dashboards plot e.g. "what's the distribution of favorite_score predictions today" + "what % of candidates have a favorite_score."
fn record_score_distributions(receiver: &dyn StatsReceiverExt, candidates: &[PostCandidate]) {
record_head(
receiver,
"favorite",
candidates.iter().map(|c| c.phoenix_scores.favorite_score),
);
record_head(
receiver,
"reply",
candidates.iter().map(|c| c.phoenix_scores.reply_score),
);
record_head(
receiver,
"retweet",
candidates.iter().map(|c| c.phoenix_scores.retweet_score),
);
record_head(
receiver,
"click",
candidates.iter().map(|c| c.phoenix_scores.click_score),
);
record_head(
receiver,
"vqv",
candidates.iter().map(|c| c.phoenix_scores.vqv_score),
);
record_head(
receiver,
"share",
candidates.iter().map(|c| c.phoenix_scores.share_score),
);
record_head(
receiver,
"not_interested",
candidates
.iter()
.map(|c| c.phoenix_scores.not_interested_score),
);
record_head(
receiver,
"weightedScore",
candidates.iter().map(|c| c.weighted_score),
);
record_head(receiver, "finalScore", candidates.iter().map(|c| c.score));
}
Track 7 of the 22 Phoenix scores plus the two aggregated scores (weightedScore, finalScore). The seven selected are the most-important actions for ranking — saves cost on the others.
fn record_retrieval_source_stats(
receiver: &dyn StatsReceiverExt,
selected_candidates: &[PostCandidate],
non_selected_candidates: &[PostCandidate],
retrieval_cluster: &str,
experiment_buckets: &[&ExperimentBucket],
served_type: ServedType,
metric_name: &str,
) {
let mut all_candidates: Vec<&PostCandidate> = selected_candidates
.iter()
.chain(non_selected_candidates.iter())
.collect();
all_candidates.sort_by(|a, b| {
let score_a = a.weighted_score.unwrap_or(f64::NEG_INFINITY);
let score_b = b.weighted_score.unwrap_or(f64::NEG_INFINITY);
score_b
.partial_cmp(&score_a)
.unwrap_or(std::cmp::Ordering::Equal)
});
Sort ALL candidates (selected + non_selected) by weighted_score descending. The Phoenix retrieval source's position in this ranked list is what we'll measure.
let empty_bucket = ExperimentBucket::new("", "");
let buckets: &[&ExperimentBucket] = if experiment_buckets.is_empty() {
&[&empty_bucket]
} else {
experiment_buckets
};
If no experiment buckets, use a single "empty" placeholder. Lets the loop below work uniformly.
let topk_key = format!("{METRIC_PREFIX}.{metric_name}.RankedTopK");
for &k in HEAVY_RANKER_TOP_K {
let count = all_candidates
.iter()
.take(k)
.filter(|c| c.served_type == Some(served_type))
.count();
let k_str = match k {
1 => "1",
10 => "10",
35 => "35",
_ => "unknown",
};
for b in buckets {
let scopes: [(&str, &str); 5] = [
("type", "sum"),
("retrieval_cluster", retrieval_cluster),
("k", k_str),
("ddg", &b.experiment),
("bucket", &b.bucket),
];
receiver.incr(&topk_key, &scopes, count as u64);
let req_scopes: [(&str, &str); 5] = [
("type", "requests"),
("retrieval_cluster", retrieval_cluster),
("k", k_str),
("ddg", &b.experiment),
("bucket", &b.bucket),
];
receiver.incr(&topk_key, &req_scopes, 1);
}
}
Position-of-source metric: for each k ∈ {1, 10, 35}, count how many of the top-K post-ranking candidates came from this retrieval source. Plus a counter for the number of requests (used to compute the average via sum/requests on the dashboard side).
Each metric is tagged with experiment + bucket, so dashboards can break down "Phoenix retrieval contribution to top-1 in experiment X bucket Y."
ddg = Decider/Decision Group (legacy name for experiment).
let served_key = format!("{METRIC_PREFIX}.{metric_name}.Served");
let count = selected_candidates
.iter()
.filter(|c| c.served_type == Some(served_type))
.count();
for b in buckets {
receiver.incr(
&served_key,
&[
("type", "sum"),
("retrieval_cluster", retrieval_cluster),
("ddg", &b.experiment),
("bucket", &b.bucket),
],
count as u64,
);
receiver.incr(
&served_key,
&[
("type", "requests"),
("retrieval_cluster", retrieval_cluster),
("ddg", &b.experiment),
("bucket", &b.bucket),
],
1,
);
}
}
Same sum/requests pattern for the final served set (not top-K of all candidates — only what made the response). Lets dashboards compute "fraction of served items from this retrieval source."
Together these tell the story: how much retrieval-source contribution at top-1 (signal of strength) vs in the served set (signal of impact).
client_events_kafka_side_effect.rs (302 lines) — client-event firehose
The longest side-effect. Builds dozens of "client-event"-flavored log entries and publishes each to Kafka. Drives the cross-product analytics dashboards.
pub struct ClientEventsKafkaSideEffect {
kafka_client: Arc<dyn KafkaPublisherClient>,
}
impl ClientEventsKafkaSideEffect {
pub async fn prod() -> Self {
Self::new(Arc::new(
ProdKafkaPublisherClient::new(CLIENT_EVENT_TOPIC, KafkaCluster::ClientEvents).await,
))
}
}
Yet another Kafka cluster: ClientEvents. So far we've seen Bluebird, Ads, Phoenix, ClientEvents. Each has its own retention / fanout / downstream consumer set.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let query = &input.query;
let items = &input.selected_candidates;
let posts: Vec<&ScoredPost> = items
.iter()
.filter_map(|i| match &i.item {
Some(feed_item::Item::Post(p)) => Some(p),
_ => None,
})
.collect();
let ad_count = items
.iter()
.filter(|i| matches!(i.item, Some(feed_item::Item::Ad(_))))
.count() as i64;
let wtf_count = items
.iter()
.filter(|i| matches!(i.item, Some(feed_item::Item::WhoToFollow(_))))
.count() as i64;
let post_count = posts.len() as i64;
Count posts, ads, WTF modules.
let base = ClientEventParams {
query,
client_name: ClientPlatform::from_app_id(query.client_app_id).client_name(),
section: "home",
component: None,
element: None,
action: "served_tweets",
value: 0,
};
let mut events = Vec::new();
events.extend(build_served_events(&base, post_count, ad_count, wtf_count));
events.extend(build_tweet_type_events(&base, &posts));
events.extend(build_served_type_events(&base, &posts));
events.extend(build_video_events(&base, &posts));
events.extend(build_empty_timeline_events(&base, post_count));
events.extend(build_query_events(&base, post_count));
Build a base ClientEventParams struct that all events share. Then build six categories of events, each producing several log lines. Total can be 50+ events per request.
let payloads: Vec<Vec<u8>> = events
.iter()
.filter(|e| e.event_value.unwrap_or(0) > 0)
.map(|e| serialize_binary(e).map_err(|e| format!("Thrift serialization failed: {e}")))
.collect::<Result<_, _>>()?;
let futs: Vec<_> = payloads
.iter()
.map(|bytes| self.kafka_client.send(bytes))
.collect();
let results = futures::future::join_all(futs).await;
if let Some(err) = results.into_iter().find_map(|r| r.err()) {
return Err(format!("Client-event Kafka publish failed: {err}"));
}
Ok(())
}
}
Filter out zero-valued events (no point publishing "served 0 video posts"). Serialize each, send all in parallel, surface first error.
collect::<Result<_, _>>()? — collects an iterator of Results into a Result<Vec<_>, _>. Stops at first error.
Event-category builders
fn build_served_events(
base: &ClientEventParams,
post_count: i64,
ad_count: i64,
wtf_count: i64,
) -> Vec<LogEvent> {
vec![
build_log_event(&ClientEventParams {
value: post_count + ad_count,
..*base
}),
build_log_event(&ClientEventParams {
component: Some("injected"),
value: post_count,
..*base
}),
build_log_event(&ClientEventParams {
component: Some("promoted"),
value: ad_count,
..*base
}),
build_log_event(&ClientEventParams {
component: Some("who_to_follow"),
action: "served_users",
value: wtf_count,
..*base
}),
]
}
Four counter-events:
- Total served (posts + ads).
- "injected" posts (organic).
- "promoted" ads.
- "who_to_follow" users.
The ..*base is struct update syntax — copy all fields from base except those explicitly overridden.
fn build_tweet_type_events(base: &ClientEventParams, posts: &[&ScoredPost]) -> Vec<LogEvent> {
TWEET_TYPE_PREDICATES
.iter()
.map(|&(bit_idx, name)| {
let count = posts
.iter()
.filter(|p| bitset_get(&p.tweet_type_metrics, bit_idx))
.count();
build_log_event(&ClientEventParams {
component: Some("injected"),
element: Some(name),
value: count as i64,
..*base
})
})
.collect()
}
For each tweet-type predicate from Session 08's TweetTypeMetricsHydrator (the bit indices like RETWEET, REPLY, AUTHOR_FOLLOWERS_0_100, etc.), count how many served posts have that bit set, emit as an event with element = predicate_name.
So we get e.g. home.injected.RETWEET.served_tweets = 5 if 5 retweets were served. Detailed slicing.
fn build_served_type_events(base: &ClientEventParams, posts: &[&ScoredPost]) -> Vec<LogEvent> {
let mut counts: HashMap<i32, i64> = HashMap::new();
for post in posts {
*counts.entry(post.served_type).or_default() += 1;
}
counts
.into_iter()
.map(|(st, count)| {
let name = ServedType::try_from(st)
.map(|t| t.as_str_name().to_ascii_lowercase())
.unwrap_or_else(|_| format!("unknown_{st}"));
base_build_log_event(base, Some(name), None, base.action, count)
})
.collect()
}
Group posts by served_type, emit one event per type. So "5 from thunder, 12 from phoenix_retrieval, 3 from tweet_mixer."
The enum-name → lowercase: "FOR_YOU_PHOENIX_RETRIEVAL" → "for_you_phoenix_retrieval". Different from the served-history upper_snake_to_pascal — different downstream consumer convention.
fn build_video_events(base: &ClientEventParams, posts: &[&ScoredPost]) -> Vec<LogEvent> {
let num_videos = posts
.iter()
.filter(|p| bitset_get(&p.tweet_type_metrics, VIDEO))
.count();
vec![build_log_event(&ClientEventParams {
component: Some("with_video_duration"),
element: Some("num_videos"),
value: num_videos as i64,
..*base
})]
}
Video-specific event. Could be folded into build_tweet_type_events but kept separate (presumably for backward-compat with a specific dashboard).
fn build_empty_timeline_events(base: &ClientEventParams, post_count: i64) -> Vec<LogEvent> {
if post_count > 0 {
return Vec::new();
}
vec![build_log_event(&ClientEventParams {
action: "empty",
element: Some("served_non_promoted_tweet"),
..*base
})]
}
Conditional event — emit only when posts count is 0. Lets us track empty-response rates separately.
fn build_query_events(base: &ClientEventParams, post_count: i64) -> Vec<LogEvent> {
let query = base.query;
let ctx = RequestContext::parse(&query.request_context);
let query_predicates: Vec<(&str, bool)> = vec![
("request", true),
("get_older", query.is_bottom_request),
("get_newer", query.is_top_request),
("get_initial", query.cursor.is_none()),
("pull_to_refresh", ctx == RequestContext::PullToRefresh),
("request_context_launch", ctx == RequestContext::Launch),
(
"request_context_foreground",
ctx == RequestContext::Foreground,
),
];
let size_predicates: Vec<(&str, bool)> = vec![
("size_is_empty", post_count == 0),
("size_at_most_5", post_count <= 5),
("size_at_most_10", post_count <= 10),
("size_at_most_35", post_count <= 35),
];
let mut events = Vec::new();
for &(query_name, query_match) in &query_predicates {
if !query_match {
continue;
}
for &(size_name, size_match) in &size_predicates {
if !size_match {
continue;
}
events.push(build_log_event(&ClientEventParams {
component: Some(size_name),
action: query_name,
element: None,
value: 1,
..*base
}));
}
}
events
}
The cross-product: 7 query types × 4 size buckets = up to 28 events. But filtered to matching predicates only. For e.g. a get_initial request with 7 posts:
(get_initial, size_at_most_10): emit(get_initial, size_at_most_35): emit(get_initial, size_at_most_5): skip (7 > 5)(get_initial, size_is_empty): skip
("request", true) always fires — so we always get (request, size_xxx) events. The "request" predicate is the baseline counter.
#[derive(Clone, Copy)]
struct ClientEventParams<'a> {
query: &'a ScoredPostsQuery,
client_name: &'a str,
section: &'a str,
component: Option<&'a str>,
element: Option<&'a str>,
action: &'a str,
value: i64,
}
The shared params struct. #[derive(Clone, Copy)] on a struct with references — works because all fields are either Copy types or references (which are Copy). Lets us do ..*base everywhere.
fn build_log_event(p: &ClientEventParams) -> LogEvent {
base_build_log_event(
p,
p.component.map(String::from),
p.element.map(String::from),
p.action,
p.value,
)
}
fn base_build_log_event(
p: &ClientEventParams,
component: Option<String>,
element: Option<String>,
action: &str,
value: i64,
) -> LogEvent {
let now_ms = p.query.request_time_ms;
let event_name = [
p.client_name,
"home",
p.section,
component.as_deref().unwrap_or(""),
element.as_deref().unwrap_or(""),
action,
]
.join(":");
LogEvent {
log_base: Some(LogBase {
transaction_id: String::new(),
ip_address: p.query.ip_address.clone(),
user_id: Some(p.query.user_id as i64),
timestamp: now_ms,
language: Some(p.query.language_code.clone()),
client_app_id: Some(p.query.client_app_id as i64),
device_id: Some(p.query.device_id.clone()),
country: Some(p.query.country_code.clone()),
timezone: None,
}),
event_name: Some(event_name),
event_value: Some(value),
event_details: None,
event_namespace: Some(EventNamespace {
client: Some(p.client_name.to_string()),
page: Some("home".to_string()),
section: Some(p.section.to_string()),
component,
element,
action: Some(action.to_string()),
}),
notification_details: None,
}
}
Build the final LogEvent proto. event_name is the canonical colon-separated string: "iphone:home:home:injected::served_tweets". event_namespace has the same data in structured form. Both — old consumers expect the string, new ones use the namespace.
Empty components/elements show as empty strings in the joined name (:::). Convention is to keep them present in the name for parsability.
The LogBase block contains user identification + locale. This is X's universal "who made this request" envelope.
What we've learned
Side-effect side of the pipeline — the data feeds:
- Training data:
ServedCandidatesKafka(Phoenix cluster),RerankingKafka(5% sampled),PhoenixExperiments(all shadow-eligible clusters). - Analytics:
ScoredStatsSideEffect(score distributions + retrieval position),ForYouResponseStatsSideEffect,ClientEventsKafka(firehose). - State writes for next request:
UpdateServedHistory(Manhattan),UpdatePastRequestTimestamps(Manhattan),RedisPostCandidateCache(Redis),PhoenixRequestCache(cross-DC Redis). - Compliance/billing:
AdsInjectionLogging(Ads cluster),PublishSeenIdsToKafka(impressions). - Maintenance:
TruncateServedHistory,MutualFollowStats.
Five distinct Kafka clusters in use:
Bluebird— general impressions.Ads— ad attribution / billing.Phoenix— training data (served candidates, scored candidates).Aiml— ML pipeline data (Phoenix predictions).ClientEvents— analytics firehose.
Shadow-traffic patterns:
is_shadow_traffic = is_sampled(request_id, 0.5)from Session 04.PhoenixExperimentsandServedCandidatesKafkaand (Session 08) some context hydrators gate on this.- Independent from B3 trace sampling.
- Independent from random
< 0.05(5% sampling).
Sample-rate stacks:
- Shadow traffic: 50% of requests.
RerankingKafka/ScoredStats: 5% of those.- Combined: ~2.5% of total traffic gets full analytics.
Two ID conventions:
query.request_id→ used astrace_idANDrequest_join_idin published events.query.prediction_id→ used as Phoenix prediction request ID. Different value, different downstream use.
Multi-entry served-history for replies: a single served reply produces N entries (one per ancestor + self) so that "user saw the conversation" is fully recorded for dedup.
Cross-product event emission: build_query_events emits up to 28 events from 7×4 predicates. Lets dashboards slice "P99 latency for get_initial requests with size_at_most_5" via two-dimensional Prometheus queries.
Empty-bucket placeholder: record_retrieval_source_stats uses an ExperimentBucket::new("", "") placeholder when no experiments are active. Lets the same loop body work whether or not experiments are happening — uniform code path.
Sum/requests pattern: emit both ("type", "sum") and ("type", "requests") counters. Compute averages on the dashboard side via rate(sum) / rate(requests). Avoids histograms when only the mean is wanted.
Forward-compat schema markers: TweetScoreV1 enum variant. Implies V2 planned. Schema evolution in published events is a Thrift-style versioned-variant problem.
Per-message Kafka publish vs. batching: most side-effects send one Kafka message per item — N posts → N sends in parallel. No batching. Trade-off: lower latency, more network traffic. Works because the Kafka client likely has internal batching.
Next session
Session 14 — Ads (533 LOC). The home-mixer/ads/ module:
safe_gap_blender.rs(95) — keeps ads N positions apart from each otherpartition_organic_blender.rs(190) — alternative blending strategy with brand-safety partitioningutil.rs(228) — shared ad-positioning helpersmod.rs(20) — re-exports + theAdsBlendertrait
After Session 14, we leave the Rust home-mixer and dive into the Python Phoenix model code.