X For You algorithm, line by line · Part 12
X For You algorithm, line by line — Part 12: Side effects (part 1)
Part 12 of the deep dive into xai-org/x-algorithm. First half of the side-effect stage: MutualFollow stats, served-history truncation, past-request-timestamps write, Kafka impressions publish, Redis post-candidate cache with zstd, cross-DC Phoenix request cache, ads-injection logging, response-stats counters, 5%-sampled reranking Kafka publish.
Side effects are the fire-and-forget tasks that run after the response goes back to the client. They write to Kafka for training data, update Manhattan/Redis state, emit metrics, log to debug streams. From Session 01 we know they're spawned into Tokio tasks via tokio::spawn — their failures don't affect the response.
The Phoenix and For You pipelines combine for 14 active side-effects. Sessions 12-13 split them. This session covers 10 (~1,000 LOC) — the smaller / infrastructure-focused ones. Session 13 has the 5 heavier ones (analytics + state-write).
Files covered:
home-mixer/side_effects/
├── mod.rs (14)
├── mutual_follow_stats_side_effect.rs (67) stats only
├── truncate_served_history_side_effect.rs (53) Manhattan maintenance
├── update_past_request_timestamps_side_effect.rs (61) Manhattan write
├── publish_seen_ids_to_kafka_side_effect.rs (79) Kafka publish
├── redis_post_candidate_cache_side_effect.rs (92) Redis cache write
├── phoenix_request_cache_side_effect.rs (131) cross-DC Redis write
├── ads_injection_logging_side_effect.rs (146) Kafka ad logging
├── for_you_response_stats_side_effect.rs (157) response metrics
├── reranking_kafka_side_effect.rs (157) Kafka reranking samples
└── cache_request_info_side_effect.rs (42) [orphan]
The 14 in mod.rs are wired into the pipelines. cache_request_info_side_effect.rs is an orphan file (not in mod.rs) — pre-refactor leftover.
mod.rs (14 lines)
pub mod ads_injection_logging_side_effect;
pub mod client_events_kafka_side_effect;
pub mod for_you_response_stats_side_effect;
pub mod mutual_follow_stats_side_effect;
pub mod phoenix_experiments_side_effect;
pub mod phoenix_request_cache_side_effect;
pub mod publish_seen_ids_to_kafka_side_effect;
pub mod redis_post_candidate_cache_side_effect;
pub mod reranking_kafka_side_effect;
pub mod scored_stats_side_effect;
pub mod served_candidates_kafka_side_effect;
pub mod truncate_served_history_side_effect;
pub mod update_past_request_timestamps_side_effect;
pub mod update_served_history_side_effect;
14 modules. cache_request_info_side_effect (in the directory) is not declared — orphan.
mutual_follow_stats_side_effect.rs (67 lines) — pure metrics
The simplest active side effect. Computes the average mutual-follow Jaccard across selected candidates and emits a histogram.
const METRIC_PREFIX: &str = "MutualFollowJaccard";
pub struct MutualFollowStatsSideEffect;
#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for MutualFollowStatsSideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
query.params.get(EnableMutualFollowJaccardHydration)
}
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
let Some(receiver) = global_stats_receiver() else {
return Ok(());
};
let candidates = &input.selected_candidates;
if candidates.is_empty() {
return Ok(());
}
Three guards: enabled flag, receiver exists, at least one candidate.
let retrieval_cluster: String = input.query.params.get(PhoenixRetrievalInferenceClusterId);
let moe_enabled: bool = input.query.params.get(EnablePhoenixMOESource);
let mut scope: Vec<(&str, &str)> = vec![("retrieval_cluster", &retrieval_cluster)];
let moe_cluster: String;
if moe_enabled {
moe_cluster = input
.query
.params
.get(PhoenixRetrievalMOEInferenceClusterId);
scope.push(("moe_cluster", &moe_cluster));
}
Build the metric scope (tags). Always include retrieval_cluster; if MoE is enabled, also tag the MoE cluster. The moe_cluster: String; declared outside the if keeps it alive for the duration of scope — necessary because scope borrows &moe_cluster.
let avg_key = format!("{METRIC_PREFIX}.avgScore");
let mut sum = 0.0f64;
let mut present = 0u64;
for candidate in candidates {
if let Some(j) = candidate.mutual_follow_jaccard {
present += 1;
sum += j;
}
}
if present > 0 {
let avg = sum / present as f64;
receiver.observe(&avg_key, &scope, avg, HistogramBuckets::Bucket0To1);
}
Ok(())
}
}
Compute average of present Jaccard values (skip None). Emit as a histogram in [0, 1] bucket. Lets dashboards correlate average mutual-follow score with experiment arms.
truncate_served_history_side_effect.rs (53 lines) — Manhattan trim
Trims the user's served history to a bounded size. Old entries get deleted.
const MAX_RESPONSES: usize = 50;
pub struct TruncateServedHistorySideEffect {
client: Arc<dyn ServedHistoryClient>,
}
#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for TruncateServedHistorySideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
is_prod()
&& query.params.get(EnableUrtMigrationComponents)
&& query.served_history.len() > MAX_RESPONSES
}
Only fires when prod + flag is on + history is over the threshold. Most requests skip this — only the ones that have accumulated history-overflow trigger trimming.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let query = &input.query;
let to_delete: Vec<i64> = query
.served_history
.iter()
.skip(MAX_RESPONSES)
.filter_map(|sh| sh.served_time_ms)
.collect();
if to_delete.is_empty() {
return Ok(());
}
let platform = client_platform::from_client_app_id(query.client_app_id);
self.client
.delete(query.user_id, TimelineType::Home, platform, &to_delete)
.await
.map_err(|e| e.to_string())
}
}
Take the oldest entries past index 50, extract their served-time-ms keys, send a delete call. Trim happens lazily and only when over the limit. Saves Manhattan storage for users who request frequently.
update_past_request_timestamps_side_effect.rs (61 lines) — write request timestamp
Records "this user made a non-polling request at time X" — so the next request's PastRequestTimestampsQueryHydrator can read it.
const MAX_NON_POLLING_TIMES: usize = 10;
pub struct UpdatePastRequestTimestampsSideEffect {
client: Arc<dyn PastRequestTimestampsClient>,
}
#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for UpdatePastRequestTimestampsSideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
let is_background_fetch =
RequestContext::parse(&query.request_context) == RequestContext::BackgroundFetch;
is_prod()
&& query.params.get(EnableUrtMigrationComponents)
&& !query.is_polling
&& !is_background_fetch
}
Skip if polling or background-fetch. We only record explicit user-initiated requests (foreground refresh).
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let query = &input.query;
let now_ms = query.request_time_ms;
let prior_timestamps = query
.non_polling_timestamps
.as_ref()
.map(|npt| &npt.non_polling_timestamps_ms[..])
.unwrap_or(&[]);
let most_recent = query
.non_polling_timestamps
.as_ref()
.and_then(|npt| npt.most_recent_home_latest_non_polling_timestamp_ms);
let timestamps: Vec<i64> = std::iter::once(now_ms)
.chain(prior_timestamps.iter().copied())
.take(MAX_NON_POLLING_TIMES)
.collect();
let npt = NonPollingTimestamps::new(timestamps, most_recent);
self.client.put(query.user_id, &npt).await
}
}
Build the new list: [now] + prior[..], capped at 10. Preserve most_recent_home_latest_non_polling_timestamp_ms from the prior state. Write to Manhattan.
Sliding window of the last 10 user-initiated request times. Used downstream for "you've been gone for X" UX (e.g., "Catch up on what you missed" markers in URT).
publish_seen_ids_to_kafka_side_effect.rs (79 lines) — Kafka impressions
Publishes the user's seen-IDs to Kafka. Downstream consumers update impression bloom filters, training data, etc.
pub struct PublishSeenIdsToKafkaSideEffect {
kafka_client: Arc<dyn KafkaPublisherClient>,
}
impl PublishSeenIdsToKafkaSideEffect {
pub fn new(kafka_client: Arc<dyn KafkaPublisherClient>) -> Self {
Self { kafka_client }
}
pub async fn prod() -> Self {
Self::new(Arc::new(
ProdKafkaPublisherClient::new(CLIENT_SENT_IMPRESSIONS_TOPIC, KafkaCluster::Bluebird)
.await,
))
}
}
Production constructor builds a Kafka client for CLIENT_SENT_IMPRESSIONS_TOPIC on the Bluebird cluster (X's internal Kafka cluster names).
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
is_prod() && !query.seen_ids.is_empty() && query.params.get(EnablePublishSeenIdsToKafka)
}
Prod-only + non-empty seen_ids + flag.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let query = &input.query;
if query.seen_ids.is_empty() {
return Ok(());
}
let current_time = query.request_time_ms;
let surface_areas = Some(BTreeSet::from([SurfaceArea::HOME_TIMELINE]));
let impressions: Vec<Impression> = query
.seen_ids
.iter()
.map(|&tweet_id| Impression {
tweet_id: tweet_id as i64,
impression_time: Some(current_time),
linger_time_ms: None,
surface_areas: surface_areas.clone(),
media_details: None,
})
.collect();
Build one Impression per seen ID with the current time. Set the surface to HOME_TIMELINE. linger_time_ms is None (we don't know how long they looked; this is just "client said it sent these IDs as already-seen").
let published = PublishedImpressionList::new(
query.user_id as i64,
ImpressionList::new(impressions),
current_time,
);
let bytes = serialize_binary(&published)
.map_err(|e| format!("Thrift serialization failed: {e}"))?;
self.kafka_client
.send(&bytes)
.await
.map_err(|e| format!("Seen-IDs Kafka publish failed: {e}"))
}
}
Wrap in PublishedImpressionList, serialize to Thrift binary, send to Kafka.
This is the data feed for the impression bloom filter store that ImpressionBloomFilterQueryHydrator (Session 09) reads back next time.
redis_post_candidate_cache_side_effect.rs (92 lines) — fill the read cache
Writes the top scored candidates to Redis so the next request's CachedPostsQueryHydrator (Session 09) can hit the cache and skip the entire scoring pipeline.
const REDIS_TTL_SECONDS: u64 = 180;
const ZSTD_COMPRESSION_LEVEL: i32 = 6;
pub struct RedisPostCandidateCacheSideEffect {
redis_client: Arc<dyn RedisClient>,
}
180-second TTL: cached posts expire after 3 minutes. Short — the model output is stale fast. Long enough to cover a typical user's quick second pull.
ZSTD_COMPRESSION_LEVEL: 6 — moderate compression. 9 would be slower; 3 would be faster but bigger.
fn get_candidates_to_cache<'a>(
selected: &'a [PostCandidate],
non_selected: &'a [PostCandidate],
max_posts_to_cache: usize,
) -> Vec<&'a PostCandidate> {
let mut all_candidates: Vec<&PostCandidate> = selected
.iter()
.chain(non_selected.iter())
.filter(|c| c.weighted_score.is_some_and(|s| s > 0.0))
.collect();
all_candidates.sort_by(|a, b| {
b.weighted_score
.unwrap()
.partial_cmp(&a.weighted_score.unwrap())
.unwrap_or(std::cmp::Ordering::Equal)
});
all_candidates.truncate(max_posts_to_cache);
all_candidates
}
Cache more than was served: take both selected and non_selected, sort by weighted_score descending, truncate to max_posts_to_cache (e.g., 500). So the cache has the top N scored candidates — when the next request hits, the cached set can serve a different cut of the top-K.
Filter: only positive-weighted scores. Zero-scored candidates are useless to cache.
b.weighted_score.unwrap() — safe because we already filtered out None values via the filter. Idiomatic if-controversial pattern.
#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for RedisPostCandidateCacheSideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
is_prod() && !query.has_cached_posts
}
Prod-only + don't double-cache. If we already used cached posts, no need to overwrite.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
let max_posts_to_cache = input.query.params.get(MaxPostsToCache);
let user_id = input.query.user_id;
let candidates_to_cache = Self::get_candidates_to_cache(
&input.selected_candidates,
&input.non_selected_candidates,
max_posts_to_cache,
);
let cache_key = redis_client::cached_posts_key(
user_id,
&input.query.topic_ids,
input.query.in_network_only,
input.query.exclude_videos,
);
Same cache key as the read side (Session 09) — (user, topics, in_network_only, exclude_videos). Read/write key must match.
let json_payload =
serde_json::to_vec(&candidates_to_cache).map_err(|err| err.to_string())?;
let uncompressed_size = json_payload.len();
let compressed_payload = tokio::task::spawn_blocking(move || {
zstd::encode_all(json_payload.as_slice(), ZSTD_COMPRESSION_LEVEL)
})
.await
.map_err(|err| err.to_string())?
.map_err(|err| err.to_string())?;
Two-step serialize: JSON → zstd. The zstd compression runs on spawn_blocking because it's CPU-bound (and at level 6 takes a few ms for 500 candidates). Keeping it off the async runtime.
Double ? again: outer for JoinError, inner for zstd::Error.
tracing::debug!(
user_id = user_id,
count = candidates_to_cache.len(),
cache_key = cache_key.clone(),
uncompressed_size = uncompressed_size,
compressed_size = compressed_payload.len(),
"RedisPostCandidateCacheSideEffect caching candidates"
);
self.redis_client
.set_ex(cache_key, compressed_payload, REDIS_TTL_SECONDS)
.await
.map_err(|err| err.to_string())
}
}
Debug log with both sizes (useful for compression-ratio diagnostics), then SET EX to Redis.
phoenix_request_cache_side_effect.rs (131 lines) — cross-datacenter cache
Caches the Phoenix model request (input data + per-candidate features) for replay analysis. Writes to two Redis clients — one in Atlanta, one in PDXA — for cross-DC analysis.
const KEY_PREFIX: &str = "phoenix_request_cache";
pub struct PhoenixRequestCacheSideEffect {
phoenix_request_cache_redis_atla_client: Arc<dyn RedisClient>,
phoenix_request_cache_redis_pdxa_client: Arc<dyn RedisClient>,
}
Two Redis clients — different DCs (Atlanta, PDXA — "PDX" is Portland; the "A" suffix is likely a specific cluster within that region).
pub fn request_level_cache_key(prediction_request_id: u64) -> String {
format!("{KEY_PREFIX}_{prediction_request_id}")
}
pub fn candidate_cache_key(prediction_request_id: u64, post_id: u64) -> String {
format!("{KEY_PREFIX}_{prediction_request_id}_{post_id}")
}
Two key schemes:
- Request-level: one key per Phoenix request, holding the request envelope (without sequences/candidates).
- Candidate-level: one key per
(request_id, post_id), holding the per-candidate features.
Decomposed so a replay can fetch the envelope once + the candidates it cares about, instead of one giant blob.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
let query = &input.query;
let ttl = query.params.get(PhoenixRequestCacheSideEffectTtlSeconds);
if input.selected_candidates.is_empty() && input.non_selected_candidates.is_empty() {
return Ok(());
}
let prediction_request_id = input
.selected_candidates
.iter()
.find_map(|c| c.prediction_request_id);
let Some(prediction_request_id) = prediction_request_id else {
return Ok(());
};
Find the prediction_request_id by scanning selected candidates. Bail if no candidates have one (means Phoenix didn't score).
let product_surface = if query.in_network_only {
ProductSurface::HomeTimelineRankedFollowing
} else {
ProductSurface::HomeTimelineRanking
};
let mut entries: Vec<(String, Vec<u8>)> = Vec::new();
if !query.has_cached_posts {
let phoenix_request =
build_request_without_sequence_and_candidates(query, product_surface);
entries.push((
request_level_cache_key(prediction_request_id),
phoenix_request.encode_to_vec(),
));
}
for candidate in build_tweet_infos(query, &input.selected_candidates) {
entries.push((
candidate_cache_key(prediction_request_id, candidate.tweet_id),
candidate.encode_to_vec(),
));
}
Build the entries list:
- Request-level entry (only if not using cached posts — otherwise the request envelope was already cached).
- One entry per selected candidate (the
TweetInfoproto).
Note: build_request_without_sequence_and_candidates and build_tweet_infos are helpers in util::phoenix_request — they construct the data we'd send to Phoenix.
let mut futures: Vec<
std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send + '_>>,
> = Vec::new();
for (key, value) in entries {
let pdxa_key = key.clone();
let pdxa_value = value.clone();
futures.push(Box::pin(async move {
self.phoenix_request_cache_redis_atla_client
.set_ex(key, value, ttl)
.await
.map_err(|e| e.to_string())
}));
futures.push(Box::pin(async move {
self.phoenix_request_cache_redis_pdxa_client
.set_ex(pdxa_key, pdxa_value, ttl)
.await
.map_err(|e| format!("xdc: {e}"))
}));
}
let results = join_all(futures).await;
For each entry, schedule two parallel writes (atla + pdxa). The futures are heap-allocated Box::pin(...) because they have different types (different captured variables) but need to share a Vec. '_ lifetime — they borrow self.
PDXA errors get prefixed with "xdc:" (cross-datacenter) so the failure mode is distinguishable.
join_all resolves when all are done.
let total = results.len();
let mut first_error: Option<String> = None;
let mut error_count = 0usize;
for result in results {
if let Err(e) = result {
error_count += 1;
if first_error.is_none() {
first_error = Some(e);
}
}
}
if total > 0 && error_count * 10 > total {
return Err(first_error.unwrap_or_default());
}
Ok(())
}
}
Tolerate up to 10% errors. If more than 10% of writes fail (error_count * 10 > total), surface the first error. Otherwise treat as success.
The threshold reflects the reality of two-DC writes: one DC can be slow/down without failing the whole side-effect. Only when both DCs degrade does the side-effect fail.
ads_injection_logging_side_effect.rs (146 lines) — ad-decision Kafka log
Logs to Kafka which ads were considered and which were chosen. Drives ad-attribution downstream.
pub struct AdsInjectionLoggingSideEffect {
kafka_client: Arc<dyn KafkaPublisherClient>,
}
impl AdsInjectionLoggingSideEffect {
pub async fn prod() -> Self {
Self::new(Arc::new(
ProdKafkaPublisherClient::new(ADS_INJECTION_TOPIC, KafkaCluster::Ads).await,
))
}
}
Production constructor wires to the Ads Kafka cluster (separate from Bluebird). Ad data has its own cluster for security/isolation.
#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for AdsInjectionLoggingSideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
is_prod() && query.params.get(EnableAdsInjectionLogging)
}
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let query = &input.query;
let items = &input.selected_candidates;
let fetched_posts = count_posts(items) + count_posts(&input.non_selected_candidates);
let fetched_ads = count_ads(items) + count_ads(&input.non_selected_candidates);
let request_time_ms = query.request_time_ms;
let entries: Vec<TimelineEntry> = items
.iter()
.enumerate()
.map(|(pos, item)| build_timeline_entry(item, pos))
.collect();
if entries.is_empty() {
return Ok(());
}
Count what was fetched (selected + non_selected) and what was selected. Build per-position entries from the selected list.
let timeline = AdsInjectedTimeline {
user_id: query.user_id,
entries,
request_time_ms,
display_location: DisplayLocation::TimelineHome.into(),
country_code: query.country_code.clone(),
request_id: query.request_id,
subscription_level: query
.subscription_level
.map(sub_level_to_proto)
.unwrap_or(SubscriptionLevel::LevelUnspecified)
.into(),
client_app_id: query.client_app_id as i64,
counts: Some(Counts {
retrieved_posts: fetched_posts as i32,
retrieved_ads: fetched_ads as i32,
response_posts: count_posts(items) as i32,
response_ads: count_ads(items) as i32,
}),
ip_address: query.ip_address.clone(),
user_agent: query.user_agent.clone(),
};
let bytes = timeline.encode_to_vec();
self.kafka_client
.send(&bytes)
.await
.map_err(|e| format!("Ads Kafka publish failed: {e}"))?;
Ok(())
}
}
Build the AdsInjectedTimeline proto. Embeds:
- User identity.
- Per-position entries.
- Display location.
- Counts (retrieved vs. response, posts vs. ads).
- Subscription level (ad eligibility differs by tier).
- IP, user agent (for compliance / fraud detection).
Note subscription_level defaults to LevelUnspecified if not known. Encoded to proto and sent.
fn build_timeline_entry(item: &FeedItem, position: usize) -> TimelineEntry {
match &item.item {
Some(feed_item::Item::Post(post)) => TimelineEntry {
tweet_id: post.tweet_id,
author_id: post.author_id,
position: position as i32,
promoted: false,
impression_id: 0,
brand_safety_verdict: post.brand_safety_verdict,
ad_info: None,
safety_labels: post.safety_label_types.clone(),
},
Some(feed_item::Item::Ad(ad)) => TimelineEntry {
tweet_id: ad.post_id as u64,
author_id: ad.author_id as u64,
position: position as i32,
promoted: true,
impression_id: ad.impression_id as u64,
brand_safety_verdict: 0,
ad_info: Some(ad.clone()),
safety_labels: vec![],
},
Some(feed_item::Item::WhoToFollow(_))
| Some(feed_item::Item::Prompt(_))
| Some(feed_item::Item::PushToHome(_))
| None => TimelineEntry {
position: position as i32,
..Default::default()
},
}
}
Item-type-specific entry building:
- Post: full info including brand safety + safety labels.
- Ad:
promoted = true, includesad_infofor billing/attribution. - WhoToFollow / Prompt / PushToHome / None: empty entry, just the position. These items don't affect ad budget so they're just counted.
fn sub_level_to_proto(level: CoreSubscriptionLevel) -> SubscriptionLevel {
match level {
CoreSubscriptionLevel::Basic => SubscriptionLevel::Basic,
CoreSubscriptionLevel::Premium => SubscriptionLevel::Premium,
CoreSubscriptionLevel::PremiumPlus => SubscriptionLevel::PremiumPlus,
}
}
fn count_posts(items: &[FeedItem]) -> usize {
items
.iter()
.filter(|i| matches!(i.item, Some(feed_item::Item::Post(_))))
.count()
}
fn count_ads(items: &[FeedItem]) -> usize {
items
.iter()
.filter(|i| matches!(i.item, Some(feed_item::Item::Ad(_))))
.count()
}
Helpers. The subscription-level mapping is from internal enum → proto enum (different crates).
for_you_response_stats_side_effect.rs (157 lines) — response counters
Emits a bunch of counters about every For You response: post count, ad count, "empty" rates, "sufficient" rates. Granular Prometheus dashboarding.
const RESPONSE_METRIC: &str = "ForYouFeed.response";
const TYPE_TOTAL: (&str, &str) = ("type", "total");
const TYPE_POSTS: (&str, &str) = ("type", "posts");
const TYPE_ADS: (&str, &str) = ("type", "ads");
const TYPE_EMPTY_ADS: (&str, &str) = ("type", "empty_ads");
const TYPE_EMPTY_POSTS: (&str, &str) = ("type", "empty_posts");
const TYPE_SUFFICIENT_ADS: (&str, &str) = ("type", "sufficient_ads");
const TYPE_SUFFICIENT_POSTS: (&str, &str) = ("type", "sufficient_posts");
const STAGE_RESPONSE: (&str, &str) = ("stage", "response");
const STAGE_FETCHED: (&str, &str) = ("stage", "fetched");
Const tag tuples. Two axes:
type: what's being counted (total / posts / ads / empty_ads / empty_posts / sufficient_ads / sufficient_posts).stage: response (final) vs fetched (before filtering / blending).
pub struct ForYouResponseStatsSideEffect;
#[async_trait]
impl SideEffect<ScoredPostsQuery, FeedItem> for ForYouResponseStatsSideEffect {
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, FeedItem>>,
) -> Result<(), String> {
let fetched_posts =
count_posts(&input.selected_candidates) + count_posts(&input.non_selected_candidates);
let fetched_ads =
count_ads(&input.selected_candidates) + count_ads(&input.non_selected_candidates);
let query = &input.query;
let blender_type = query.params.get(AdsBlenderType);
stat_response(
&input.selected_candidates,
fetched_posts,
fetched_ads,
&query.country_code,
query.subscription_level,
&blender_type,
);
Ok(())
}
}
Unit struct. Compute fetched counts (selected + non_selected). Delegate to stat_response.
No enable override — runs on every For You response.
fn stat_response(
items: &[FeedItem],
fetched_posts: usize,
fetched_ads: usize,
country_code: &str,
subscription_level: Option<SubscriptionLevel>,
blender_type: &str,
) {
let post_count = items
.iter()
.filter(|i| matches!(i.item, Some(feed_item::Item::Post(_))))
.count();
let ad_count = items
.iter()
.filter(|i| matches!(i.item, Some(feed_item::Item::Ad(_))))
.count();
let sub_status = subscription_level.map(|s| s.as_str()).unwrap_or("none");
info!(
"ForYouFeed response - {post_count} posts, {ad_count} ads \
(fetched {fetched_posts} posts, {fetched_ads} ads), \
country {country_code}, subscription {sub_status}, blender {blender_type}.",
);
Compute response post/ad counts. Log a human-readable line for debugging.
if let Some(receiver) = global_stats_receiver() {
let sub = ("subscription", sub_status);
let blender = ("blender", blender_type);
receiver.incr(RESPONSE_METRIC, &[TYPE_TOTAL, sub], 1);
receiver.incr(
RESPONSE_METRIC,
&[TYPE_TOTAL, sub, ("country", bucket_country(country_code))],
1,
);
receiver.incr(
RESPONSE_METRIC,
&[TYPE_POSTS, sub, blender],
post_count as u64,
);
receiver.incr(RESPONSE_METRIC, &[TYPE_ADS, sub, blender], ad_count as u64);
Per-request counters tagged with subscription. Two extra dimensions:
countrytag with bucketed country code.bucket_countryprobably groups small countries into "OTHER" to avoid label cardinality explosion in Prometheus.blendertag — which ads-blending strategy was used.
if ad_count == 0 {
receiver.incr(
RESPONSE_METRIC,
&[TYPE_EMPTY_ADS, STAGE_RESPONSE, sub, blender],
1,
);
}
if post_count == 0 {
receiver.incr(
RESPONSE_METRIC,
&[TYPE_EMPTY_POSTS, STAGE_RESPONSE, sub, blender],
1,
);
}
if ad_count >= 5 {
receiver.incr(
RESPONSE_METRIC,
&[TYPE_SUFFICIENT_ADS, STAGE_RESPONSE, sub, blender],
1,
);
}
if post_count >= 20 {
receiver.incr(
RESPONSE_METRIC,
&[TYPE_SUFFICIENT_POSTS, STAGE_RESPONSE, sub, blender],
1,
);
}
Threshold-based counters for the response stage:
- Empty ads / posts.
- Sufficient ads (≥5) / sufficient posts (≥20).
The combination of total + empty + sufficient lets us compute rates like "% of responses with insufficient posts."
if fetched_ads == 0 {
receiver.incr(RESPONSE_METRIC, &[TYPE_EMPTY_ADS, STAGE_FETCHED, sub], 1);
}
if fetched_posts == 0 {
receiver.incr(RESPONSE_METRIC, &[TYPE_EMPTY_POSTS, STAGE_FETCHED, sub], 1);
}
if fetched_ads >= 5 {
receiver.incr(
RESPONSE_METRIC,
&[TYPE_SUFFICIENT_ADS, STAGE_FETCHED, sub],
1,
);
}
if fetched_posts >= 20 {
receiver.incr(
RESPONSE_METRIC,
&[TYPE_SUFFICIENT_POSTS, STAGE_FETCHED, sub],
1,
);
}
}
}
Same set of counters but for the fetched stage (pre-blender). This lets us distinguish:
- "Empty response because blender dropped them" (fetched_posts > 0 but post_count == 0).
- "Empty response because nothing was fetched" (both zero).
Two failure modes; both monitorable.
reranking_kafka_side_effect.rs (157 lines) — 5% sampled Kafka log
Publishes a 5% sample of every Phoenix request to Kafka for offline analysis (model A/B comparison, training data augmentation).
const TOP_K: usize = 50;
pub struct RerankingKafkaSideEffect {
kafka_client: Arc<dyn KafkaPublisherClient>,
}
impl RerankingKafkaSideEffect {
pub fn new(kafka_client: Arc<dyn KafkaPublisherClient>) -> Self {
Self { kafka_client }
}
}
#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for RerankingKafkaSideEffect {
fn enable(&self, _query: Arc<ScoredPostsQuery>) -> bool {
is_prod() && random::<f64>() < 0.05
}
5% random sampling. random::<f64>() < 0.05 — uniform random per request. Independent from B3 sampling and shadow-traffic flags.
_query — the enable check doesn't even look at the query, just the dice.
async fn side_effect(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
let mut candidates: Vec<&PostCandidate> = Vec::new();
for c in &input.selected_candidates {
candidates.push(c);
}
for c in &input.non_selected_candidates {
candidates.push(c);
}
let total_count = candidates.len() as i32;
candidates.sort_by(|a, b| {
let sa = a.score.unwrap_or(f64::MIN);
let sb = b.score.unwrap_or(f64::MIN);
sb.partial_cmp(&sa).unwrap_or(std::cmp::Ordering::Equal)
});
candidates.truncate(TOP_K);
if candidates.is_empty() {
return Ok(());
}
Concatenate selected + non_selected. Sort by score descending. Take top 50. Bail if empty.
let scored_candidates: Vec<pb::ScoredCandidate> = candidates
.iter()
.enumerate()
.map(|(position, candidate)| build_scored_candidate(candidate, position as i32))
.collect();
let request_time_ms = input.query.request_time_ms;
let prediction_request_id = candidates
.iter()
.find_map(|c| c.prediction_request_id.map(|id| id as i64));
let product_surface = if input.query.in_network_only {
xai_recsys_proto::ProductSurface::HomeTimelineRankedFollowing
} else {
xai_recsys_proto::ProductSurface::HomeTimelineRanking
};
let batch = pb::ScoredCandidateBatch {
candidates: scored_candidates,
viewer_id: Some(input.query.user_id),
request_time_ms: Some(request_time_ms),
prediction_request_id,
served_request_id: prediction_request_id,
served_id: prediction_request_id,
total_candidates_count: Some(total_count),
request_join_id: Some(input.query.request_id),
product_surface: product_surface.into(),
};
let bytes = batch.encode_to_vec();
self.kafka_client
.send(&bytes)
.await
.map_err(|e| format!("Kafka publish failed: {e}"))
}
}
Build a ScoredCandidateBatch proto. Three IDs that are all the same value (prediction_request_id) — prediction_request_id, served_request_id, served_id. The proto schema is over-general; in this case all three point to the same thing.
Plus request_join_id = request_id — for joining against the request-level log.
fn build_scored_candidate(candidate: &PostCandidate, position: i32) -> pb::ScoredCandidate {
let s = &candidate.phoenix_scores;
let mut prediction_scores: HashMap<String, f64> = HashMap::new();
insert_score(&mut prediction_scores, "favorite", s.favorite_score);
insert_score(&mut prediction_scores, "reply", s.reply_score);
insert_score(&mut prediction_scores, "retweet", s.retweet_score);
insert_score(&mut prediction_scores, "photo_expand", s.photo_expand_score);
insert_score(&mut prediction_scores, "click", s.click_score);
insert_score(&mut prediction_scores, "profile_click", s.profile_click_score);
insert_score(&mut prediction_scores, "vqv", s.vqv_score);
insert_score(&mut prediction_scores, "share", s.share_score);
insert_score(&mut prediction_scores, "share_via_dm", s.share_via_dm_score);
insert_score(&mut prediction_scores, "share_via_copy_link", s.share_via_copy_link_score);
insert_score(&mut prediction_scores, "dwell", s.dwell_score);
insert_score(&mut prediction_scores, "quote", s.quote_score);
insert_score(&mut prediction_scores, "quoted_click", s.quoted_click_score);
insert_score(&mut prediction_scores, "quoted_vqv", s.quoted_vqv_score);
insert_score(&mut prediction_scores, "follow_author", s.follow_author_score);
insert_score(&mut prediction_scores, "not_interested", s.not_interested_score);
insert_score(&mut prediction_scores, "block_author", s.block_author_score);
insert_score(&mut prediction_scores, "mute_author", s.mute_author_score);
insert_score(&mut prediction_scores, "report", s.report_score);
insert_score(&mut prediction_scores, "dwell_time", s.dwell_time);
Build a string→float map of all 20 prediction scores. The proto uses a flexible map so new score types can be added without schema changes.
let source_tweet_id = candidate.retweeted_tweet_id.unwrap_or(candidate.tweet_id);
let served_type = candidate.served_type.map(|st| format!("{:?}", st));
pb::ScoredCandidate {
tweet_id: candidate.tweet_id,
score: candidate.score,
prediction_scores,
weighted_model_score: candidate.weighted_score,
author_id: Some(candidate.author_id),
source_tweet_id: Some(source_tweet_id),
served_type,
is_cached: candidate.last_scored_at_ms.is_some(),
in_network: candidate.in_network.unwrap_or(false),
position,
}
}
fn insert_score(map: &mut HashMap<String, f64>, name: &str, value: Option<f64>) {
map.insert(name.to_string(), value.unwrap_or(0.0));
}
Pack the candidate proto. Three score fields preserved (score, weighted_model_score, plus the per-action map).
served_type is debug-formatted to a string ("ForYouInNetwork" etc.) — the proto field is a string for flexibility.
is_cached: candidate.last_scored_at_ms.is_some() — heuristic for "was this candidate from the cache or freshly scored." last_scored_at_ms is set by PhoenixScorer, so its presence indicates fresh scoring. But the comment is reversed — actually is_cached = true when last_scored_at_ms.is_some() would mean "if recently scored, it's cached," which is backwards. Could be a bug, or could be that "cached" in this context means "has cached scoring metadata."
insert_score defaults None → 0.0 — consistent with the weighted score computation that treats None as 0.
Orphan: cache_request_info_side_effect.rs (42 lines)
pub struct CacheRequestInfoSideEffect {
pub strato_client: Arc<dyn StratoClient + Send + Sync>,
}
#[async_trait]
impl SideEffect<ScoredPostsQuery, PostCandidate> for CacheRequestInfoSideEffect {
fn enable(&self, query: Arc<ScoredPostsQuery>) -> bool {
env::var("APP_ENV").unwrap_or_default() == "prod" && !query.in_network_only
}
async fn run(
&self,
input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>,
) -> Result<(), String> {
let user_id: i64 = input.query.user_id;
let post_ids: Vec<i64> = input
.selected_candidates
.iter()
.map(|c| c.tweet_id)
.collect();
let client = &self.strato_client;
let res = client
.store_request_info(user_id, post_ids)
.await
.map_err(|e| e.to_string())?;
let decoded: StratoResult<StratoValue<()>> = decode(&res);
match decoded {
StratoResult::Ok(_) => Ok(()),
StratoResult::Err(_) => Err("error received from strato".to_string()),
}
}
}
Not in mod.rs — orphan. References the orphan crate::candidate_pipeline::query::ScoredPostsQuery from Session 06.
The intent was: store per-request (user, post_ids) info in Strato, presumably for downstream lookup. Almost certainly a pre-refactor version of RedisPostCandidateCacheSideEffect — single-call Strato store vs. compressed JSON in Redis with TTL.
Notable: uses env::var("APP_ENV") for the prod check instead of is_prod() — older pattern.
Also overrides run (the outer trait method) instead of side_effect (the inner one). Slightly unusual — most active side-effects override side_effect. This bypasses the stats macro instrumentation that the default run provides.
What we've learned
Side-effect anatomy: every side effect implements SideEffect<Q, C> with enable + side_effect (or run). Fires after the response. Failures don't break the response.
The Arc<Q> enable signature: side-effect enable takes Arc<Q> (not &Q like other stages) because side-effects are moved into Tokio tasks that may outlive the calling stack frame.
Common gates:
is_prod()— most side-effects skip in non-prod.- Feature flag (
EnableXxx). - Per-request data check (e.g.,
!query.has_cached_posts,query.seen_ids.is_empty()).
Random sampling: RerankingKafkaSideEffect does random::<f64>() < 0.05 for 5% sampling. Independent from B3 traffic sampling.
Cross-DC writes: PhoenixRequestCacheSideEffect writes to two Redis clusters (atla + pdxa). Tolerates up to 10% errors before failing. Cross-DC analysis enabled.
Manhattan + Redis split:
- Manhattan:
update_past_request_timestamps,truncate_served_history. Durable per-user state. - Redis:
redis_post_candidate_cache,phoenix_request_cache. Short-TTL cache.
Kafka cluster routing:
Bluebird— general (impressions).Ads— ad-data only.- (Plus more we'll see in Session 13:
Phoenix,Aiml.)
Two-stage metric tagging: for_you_response_stats_side_effect emits the same metric set with stage=response and stage=fetched tags. Lets us track blender attrition.
Threshold counters: instead of histograms, count "≥ N" and "= 0" cases as separate counters tagged with thresholds. Cheaper than histograms; rates computable via division.
Country bucketing: bucket_country(...) collapses long-tail countries into "OTHER" before tagging — avoids Prometheus label cardinality explosion.
Compression in cache writes: RedisPostCandidateCacheSideEffect uses zstd level 6 on JSON-encoded candidates. Run on spawn_blocking (CPU-bound). Debug log includes both sizes for compression-ratio visibility.
Caching pattern: write a superset of selected candidates (get_candidates_to_cache takes both selected + non_selected, sorts by weighted_score). The next request can pull a different top-K from the cached pool without re-running the model.
One pre-refactor orphan: cache_request_info_side_effect.rs — older Strato-based version of post-candidate caching.
Next session
Session 13 — Side effects, part 2. The 5 heavier ones (~1,200 LOC):
served_candidates_kafka_side_effect.rs(207) — published served setupdate_served_history_side_effect.rs(268) — Manhattan write of served historyscored_stats_side_effect.rs(281) — per-candidate scoring metricsclient_events_kafka_side_effect.rs(302) — client-event Kafka publishphoenix_experiments_side_effect.rs(174) — shadow-mode Phoenix variant runs