X For You algorithm, line by line · Part 11

X For You algorithm, line by line — Part 11: Sources

Part 11 of the deep dive into xai-org/x-algorithm. All 11 source implementations: Thunder for in-network, TweetMixer for legacy OON, three Phoenix retrieval variants (default, topics, MoE), CachedPostsSource bypass, and the For You-specific sources (ScoredPostsSource, AdsSource, WhoToFollowSource, PromptsSource, PushToHomeSource). Cluster resolution, dedup strategy, graceful degradation.

May 15, 2026·18 min read

The 11 source implementations that produce candidates. From Session 06 we know the pipeline uses 6 of these in the Phoenix (Scored Posts) pipeline and 5 in the For You pipeline. They all run in parallel during the source stage.

Files covered (~916 LOC):

home-mixer/sources/
├── mod.rs                          (11)
├── thunder_source.rs               (102)  in-network from Thunder
├── tweet_mixer_source.rs           (103)  legacy OON service
├── phoenix_source.rs               (116)  main OON Phoenix retrieval
├── phoenix_topics_source.rs        (106)  topic-aware Phoenix retrieval
├── phoenix_moe_source.rs           (73)   Phoenix Mixture-of-Experts
├── cached_posts_source.rs          (17)   reuses cached scored posts
├── scored_posts_source.rs          (32)   wraps the inner Phoenix pipeline (For You)
├── ads_source.rs                   (60)   ad index lookup (For You)
├── who_to_follow_source.rs         (101)  WTF service call (For You)
├── prompts_source.rs                (98)   prompts/injection service (For You)
└── push_to_home_source.rs          (97)   notification-clicked post (For You)

Sources split into two groups by candidate type:

Pipeline Type Sources
Phoenix (Scored Posts) PostCandidate thunder, tweet_mixer, phoenix, phoenix_topics, phoenix_moe, cached_posts
For You FeedItem scored_posts, ads, who_to_follow, prompts, push_to_home

The For You sources wrap items in FeedItem (the heterogeneous union). The Phoenix sources return raw PostCandidates.


mod.rs (11 lines)

pub mod ads_source;
pub mod cached_posts_source;
pub mod phoenix_moe_source;
pub mod phoenix_source;
pub mod phoenix_topics_source;
pub mod prompts_source;
pub mod push_to_home_source;
pub mod scored_posts_source;
pub mod thunder_source;
pub mod tweet_mixer_source;
pub mod who_to_follow_source;

11 public modules.


thunder_source.rs (102 lines) — in-network posts

Calls Thunder (Sessions 02-03) for posts authored by the viewer's follows.

pub struct ThunderSource {
    pub thunder_client: Arc<ThunderClient>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for ThunderSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        !query.has_cached_posts
    }

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let configured = ThunderCluster::parse(&query.params.get(ThunderClusterId));
        let cluster = ThunderCluster::resolve(configured, query.decider.as_ref());
        let channel = self
            .thunder_client
            .get_random_channel(cluster)
            .ok_or_else(|| "ThunderSource: no available channel".to_string())?;

        let mut client = InNetworkPostsServiceClient::new(channel.clone());

Pick the Thunder cluster (config + decider override), grab a random gRPC channel from the pool, build a per-request client.

thunder_client.get_random_channel(cluster) is load balancing across Thunder instances — random pick from the available channels. Returns None if no channels are healthy (all Thunder instances down).

        let following_list = &query.user_features.followed_user_ids;
        let excluded_ids = query.seen_ids.to_vec();

        let request = GetInNetworkPostsRequest {
            user_id: query.user_id,
            following_user_ids: following_list.iter().map(|&id| id as u64).collect(),
            max_results: query.params.get(ThunderMaxResults),
            exclude_tweet_ids: excluded_ids,
            algorithm: query.params.get(ThunderAlgorithm),
            debug: false,
            is_video_request: false,
        };

Build the request. Pulls from query state:

  • followed_user_ids (already hydrated by FollowedUserIdsQueryHydrator in Session 09).
  • seen_ids (came in on the request).
  • max_results, algorithm from feature switches.
        let response = client
            .get_in_network_posts(request)
            .await
            .map_err(|e| format!("ThunderSource: {}", e))?;

        let posts = response.into_inner().posts;

        let replies: Vec<InNetworkReply> = posts
            .iter()
            .filter_map(|post| {
                post.in_reply_to_post_id.map(|reply_to_id| InNetworkReply {
                    author_id: post.author_id as u64,
                    in_reply_to_tweet_id: reply_to_id as u64,
                })
            })
            .collect();

        let _ = query.in_network_replies.set(replies);

Build the in-network replies list as a side effect of sourcing. This is the consumer of the OnceLock<Vec<InNetworkReply>> field we read about in Session 04. Every reply in the Thunder response becomes an InNetworkReply { author, in_reply_to } entry.

query.in_network_replies.set(replies) is the one-shot write to the OnceLock. Discarded via let _ = ... — if it was already set, we don't care. This is the data feed for FollowingRepliedUsersHydrator (Session 08) — the facepile feature.

        let candidates: Vec<PostCandidate> = posts
            .into_iter()
            .map(|post| {
                let in_reply_to_tweet_id = post
                    .in_reply_to_post_id
                    .and_then(|id| u64::try_from(id).ok());
                let conversation_id = post.conversation_id.and_then(|id| u64::try_from(id).ok());

                let mut ancestors = Vec::new();
                if let Some(reply_to) = in_reply_to_tweet_id {
                    ancestors.push(reply_to);
                    if let Some(root) = conversation_id.filter(|&root| root != reply_to) {
                        ancestors.push(root);
                    }
                }

Build the ancestors list (used by VFCandidateHydrator and DedupConversationFilter from Sessions 05/08). For a reply:

  1. Push the immediate parent (in_reply_to_tweet_id).
  2. Push the conversation root (conversation_id), but only if different from the immediate parent (i.e., this is a reply-to-reply, not a direct reply to the root).

For non-replies, ancestors stays empty.

                let served_type = if !query.in_network_only {
                    pb::ServedType::ForYouInNetwork
                } else {
                    pb::ServedType::RankedFollowing
                };

                let retweeted_tweet_id = post.source_post_id.and_then(|id| u64::try_from(id).ok());

                PostCandidate {
                    tweet_id: post.post_id as u64,
                    author_id: post.author_id as u64,
                    in_reply_to_tweet_id,
                    retweeted_tweet_id,
                    ancestors,
                    served_type: Some(served_type),
                    ..Default::default()
                }
            })
            .collect();

        Ok(candidates)
    }
}

served_type is the surface tag. From this source, candidates are tagged either ForYouInNetwork (default For You) or RankedFollowing (when the Following tab is active). Used downstream for analytics and UI.

The candidate is partially populated — just the structural fields. Everything else (engagement counts, brand safety, etc.) is filled by hydrators.


tweet_mixer_source.rs (103 lines) — legacy OON

Calls the legacy Tweet Mixer service for out-of-network candidates. This is the pre-Phoenix retrieval path.

pub struct TweetMixerSource {
    pub tweet_mixer_client: Arc<dyn TweetMixerClient>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for TweetMixerSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        !query.in_network_only && !query.has_cached_posts
    }

Two gates: skip if in-network-only (no OON wanted) or if cached posts cover it.

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let excluded_tweet_ids: Option<BTreeSet<i64>> = if query.seen_ids.is_empty() {
            None
        } else {
            Some(query.seen_ids.iter().map(|&id| id as i64).collect())
        };

        let opt = |s: &str| {
            if s.is_empty() {
                None
            } else {
                Some(s.to_string())
            }
        };

Two helpers: build excluded-IDs set (or None if empty) and opt for string→Option<String>.

The BTreeSet (vs HashSet) is for stable serialization — Tweet Mixer might expect sorted IDs for deterministic request hashing or cache keys.

        let request = TweetMixerRequest {
            client_context: Box::new(ClientContext {
                user_id: Some(query.user_id as i64),
                app_id: Some(query.client_app_id as i64),
                user_agent: opt(&query.user_agent),
                country_code: opt(&query.country_code),
                language_code: opt(&query.language_code),
                ..Default::default()
            }),
            product: Box::new(Product::HOME_RECOMMENDED_TWEETS),
            product_context: Some(Box::new(
                ProductContext::HomeRecommendedTweetsProductContext(
                    HomeRecommendedTweetsProductContext {
                        excluded_tweet_ids,
                        get_random_tweets: None,
                        prediction_request_id: None,
                    },
                ),
            )),
            cursor: None,
            max_results: Some(query.params.get(TweetMixerMaxResults)),
        };

Thrift-style request building. Note Box::new(...) — the Thrift bindings use Box for nested struct fields.

        let candidates = self
            .tweet_mixer_client
            .get_recommendations(request)
            .await
            .map_err(|e| format!("TweetMixerSource: {}", e))?;

        let result = candidates
            .into_iter()
            .filter_map(|candidate| {
                let tweet_id = candidate.tweet_id as u64;

                let within_age = duration_since_creation_opt(tweet_id)
                    .map(|age| age <= Duration::from_secs(MAX_POST_AGE))
                    .unwrap_or(false);
                if !within_age {
                    return None;
                }
                …

Source-level age filtering: drop candidates older than MAX_POST_AGE immediately. The downstream AgeFilter (Session 05) does this too, but doing it here saves wasted hydration on already-too-old candidates.

This pattern shows up in multiple sources — pre-filter on cheap predicates before sending to downstream stages.

                let author_id = candidate
                    .author_id
                    .and_then(|id| u64::try_from(id).ok())
                    .unwrap_or_default();
                let in_reply_to_tweet_id = candidate
                    .in_reply_to_tweet_id
                    .and_then(|id| u64::try_from(id).ok());

                Some(PostCandidate {
                    tweet_id,
                    author_id,
                    in_reply_to_tweet_id,
                    retweeted_tweet_id: None,
                    served_type: Some(pb::ServedType::ForYouTweetMixer),
                    ..Default::default()
                })
            })
            .collect();

        Ok(result)
    }
}

Build the candidate. No retweeted_tweet_id — Tweet Mixer doesn't return retweets (only originals). served_type = ForYouTweetMixer for analytics.


phoenix_source.rs (116 lines) — main Phoenix retrieval

The primary OON retrieval. Uses the Phoenix retrieval model (two-tower).

pub struct PhoenixSource {
    pub phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
}

impl PhoenixSource {
    fn resolve_cluster(query: &ScoredPostsQuery) -> PhoenixRetrievalCluster {
        let configured_cluster =
            PhoenixRetrievalCluster::parse(&query.params.get(PhoenixRetrievalInferenceClusterId));

        let threshold: u64 = query.params.get(PhoenixRetrievalNewUserHistoryThreshold);
        if threshold > 0 {
            let action_count = query
                .retrieval_sequence
                .as_ref()
                .and_then(|s| s.metadata.as_ref())
                .map(|m| m.length)
                .unwrap_or(0);

            if action_count < threshold {
                return PhoenixRetrievalCluster::parse(
                    &query.params.get(PhoenixRetrievalNewUserInferenceClusterId),
                );
            }
        }

        if let Some(decider) = &query.decider {
            match configured_cluster {
                PhoenixRetrievalCluster::Experiment1Lap7
                    if decider.enabled("enable_phoenix_retrieval_lap7_to_fou") =>
                {
                    return PhoenixRetrievalCluster::Experiment1Fou;
                }
                PhoenixRetrievalCluster::Experiment1Fou
                    if decider.enabled("enable_phoenix_retrieval_fou_to_lap7") =>
                {
                    return PhoenixRetrievalCluster::Experiment1Lap7;
                }
                _ => {}
            }
        }

        configured_cluster
    }
}

Same cluster-resolution pattern as PhoenixScorer from Session 10. Three-tier:

  1. Default from feature switches.
  2. New-user override (when retrieval_sequence is short).
  3. Decider-based swaps between Fou/Lap7 clusters.

Note this is for retrieval, scoring uses its own resolver. Different cluster names possible since retrieval and scoring are separate models.

    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        (!query.is_topic_request() || query.is_bulk_topic_request())
            && (!query.params.get(EnableNewUserTopicRetrieval) || !query.has_new_user_topic_ids())
            && !query.in_network_only
            && !query.has_cached_posts
    }

Four-condition gate:

  1. Not a small topic request (topic requests use PhoenixTopicsSource instead; bulk topic is fine).
  2. Not running new-user-topic-retrieval (which also takes precedence).
  3. Not in-network-only.
  4. Not using cached posts.

This is the default OON path when no special mode applies.

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let user_id = query.user_id;

        let sequence = query
            .retrieval_sequence
            .as_ref()
            .ok_or_else(|| "PhoenixSource: missing retrieval_sequence".to_string())?;

        let cluster = Self::resolve_cluster(query);
        let client_context = build_client_context(query);
        let user_context = build_user_context(query);

        let response = self
            .phoenix_retrieval_client
            .retrieve(
                cluster,
                user_id,
                sequence.clone(),
                query.columnar_retrieval_sequence.clone(),
                query.params.get(PhoenixMaxResults),
                vec![],
                None,
                client_context,
                user_context,
            )
            .await
            .map_err(|e| format!("PhoenixSource: {}", e))?;

Build the retrieval request:

  • Hydrated retrieval_sequence from Session 09's RetrievalSequenceQueryHydrator.
  • columnar_retrieval_sequence — pre-encoded for fast model consumption.
  • max_results from params.
  • vec![] for topic IDs (no topic filter on default OON).
  • None for topic-filter mode.
  • Client + user contexts (built via helpers — pull from query state).
        let candidates: Vec<PostCandidate> = response
            .top_k_candidates
            .into_iter()
            .flat_map(|scored_candidates| scored_candidates.candidates)
            .filter_map(|scored_candidate| scored_candidate.candidate)
            .map(|tweet_info| PostCandidate {
                tweet_id: tweet_info.tweet_id,
                author_id: tweet_info.author_id,
                in_reply_to_tweet_id: Some(tweet_info.in_reply_to_tweet_id),
                retweeted_tweet_id: (tweet_info.retweeted_tweet_id != 0)
                    .then_some(tweet_info.retweeted_tweet_id),
                served_type: Some(pb::ServedType::ForYouPhoenixRetrieval),
                ..Default::default()
            })
            .collect();

        Ok(candidates)
    }
}

Flatten the nested response. The structure is:

  • top_k_candidates: Vec<ScoredCandidates> — Phoenix returns groups (e.g., from different retrieval heads).
  • scored_candidates.candidates: Vec<ScoredCandidate> — per-candidate within each group.
  • scored_candidate.candidate: Option<TweetInfo> — may be absent.

So flat_map → filter_map → map. Final mapping unpacks TweetInfo into our PostCandidate. The 0 sentinel pattern: retweeted_tweet_id != 0Some(id). Common protobuf-flavored "no value" handling.


phoenix_topics_source.rs (106 lines) — topic-aware retrieval

Variant of PhoenixSource that handles topic requests + new-user topic retrieval.

const INCLUDED_TOPIC_METRIC: &str = "PhoenixTopicsSource.included_topic_id";

pub struct PhoenixTopicsSource {
    pub phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for PhoenixTopicsSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        let has_topics = (query.is_topic_request() && !query.is_bulk_topic_request())
            || (query.params.get(EnableNewUserTopicRetrieval) && query.has_new_user_topic_ids());
        has_topics && !query.in_network_only && !query.has_cached_posts
    }

Inverse gate of PhoenixSource: this runs when there are topics to filter by. Two paths:

  1. Small topic request (1-6 topics).
  2. New-user retrieval with declared interests.
    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let user_id = query.user_id;

        let sequence = query
            .retrieval_sequence
            .as_ref()
            .ok_or_else(|| "PhoenixTopicsSource: missing retrieval_sequence".to_string())?;

        let cluster = PhoenixRetrievalCluster::parse(
            &query.params.get(PhoenixRetrievalTopicInferenceClusterId),
        );

        let effective_topic_ids = if query.is_topic_request() {
            &query.topic_ids
        } else {
            &query.new_user_topic_ids
        };

Pick the topics: explicit request topics OR new-user declared topics.

        let default_experiment =
            TopicFilteringExperiment::parse(&query.params.get(TopicFilteringId));
        let override_map =
            TopicFilteringOverrideMap::parse(&query.params.get(TopicFilteringOverrides));
        let topic_filter_mode = override_map
            .resolve(effective_topic_ids, default_experiment)
            .as_proto_mode();

        let max_results = query.params.get(PhoenixMaxResults);

        let topic_entity_ids: Vec<u64> = effective_topic_ids
            .iter()
            .map(|&tid| TopicIdExpansion::resolve_first(tid) as u64)
            .collect();

Apply topic-filtering experiment resolution — same pattern as TopicIdsFilter (Session 05) and FilteredTopicsHydrator (Session 08).

TopicIdExpansion::resolve_first is the canonicalization function from Session 05 (currently a no-op stub).

        if let Some(receiver) = global_stats_receiver() {
            for &tid in effective_topic_ids {
                let tid_str = tid.to_string();
                receiver.incr(INCLUDED_TOPIC_METRIC, &[("topic_id", &tid_str)], 1);
            }
        }

Emit per-topic metric. Lets us see "which topics generated retrieval requests" in dashboards.

        let response = self
            .phoenix_retrieval_client
            .retrieve(
                cluster,
                user_id,
                sequence.clone(),
                query.columnar_retrieval_sequence.clone(),
                max_results,
                topic_entity_ids,
                Some(topic_filter_mode),
                None,
                None,
            )
            .await
            .map_err(|e| format!("PhoenixTopicsSource: {}", e))?;

Same retrieval call but with topic IDs + filter mode populated.

The response decoding is identical to PhoenixSource. Same served_type = ForYouPhoenixRetrieval — downstream doesn't distinguish topic-retrieval from default-retrieval (both are "Phoenix").


phoenix_moe_source.rs (73 lines) — Mixture-of-Experts retrieval

Third Phoenix retrieval variant. MoE = Mixture of Experts — a model that routes inputs to different specialized sub-models.

pub struct PhoenixMOESource {
    pub phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for PhoenixMOESource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnablePhoenixMOESource)
            && (!query.is_topic_request() || query.is_bulk_topic_request())
            && !query.in_network_only
            && !query.has_cached_posts
    }

Feature-flag-gated experiment source. Same conditions as PhoenixSource (non-topic, non-INO, non-cached) + the MoE flag.

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let user_id = query.user_id;

        let sequence = query
            .retrieval_sequence
            .as_ref()
            .ok_or_else(|| "PhoenixMOESource: missing retrieval_sequence".to_string())?;

        let cluster = PhoenixRetrievalCluster::parse(
            &query.params.get(PhoenixRetrievalMOEInferenceClusterId),
        );

        let response = self
            .phoenix_retrieval_client
            .retrieve(
                cluster,
                user_id,
                sequence.clone(),
                query.columnar_retrieval_sequence.clone(),
                query.params.get(PhoenixMOEMaxResults),
                vec![],
                None,
                None,
                None,
            )
            .await
            .map_err(|e| format!("PhoenixMOESource: {}", e))?;

Same retrieval shape, different cluster + max_results setting. Different served_type below:

                served_type: Some(pb::ServedType::ForYouPhoenixRetrievalMoe),

ForYouPhoenixRetrievalMoe — distinct from ForYouPhoenixRetrieval. Lets downstream analytics distinguish base vs. MoE retrieval performance.

The rest of the response decoding is identical.

So the three Phoenix sources (default, topics, MoE) all hit the same PhoenixRetrievalClient but with different cluster IDs and parameters. The pipeline composition wires all three in parallel; whichever ones are enabled produce candidates, which then merge via DropDuplicatesFilter.


cached_posts_source.rs (17 lines) — the bypass

The simplest source — just returns pre-cached candidates from the query.

pub struct CachedPostsSource;

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for CachedPostsSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.has_cached_posts
    }

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        Ok(query.cached_posts.clone())
    }
}

Inverse gate of every other source — runs only when has_cached_posts IS true (vs. all the others which require it to be false). So either this source runs alone (cache hit) or all the others run (cache miss).

query.cached_posts.clone() — clones the cached Vec<PostCandidate> that CachedPostsQueryHydrator (Session 09) populated from Redis.

The cached posts already have scores and most fields populated. So when this source runs, every hydrator's enable returns false (because has_cached_posts is true) and the scorers are similarly gated. The candidates flow directly through filters → selector → response. Fast path.


scored_posts_source.rs (32 lines) — the wrapper for For You

The For You pipeline's wrapper around the inner Phoenix pipeline. Bridges PostCandidate (Phoenix output) → FeedItem (For You candidate type).

pub struct ScoredPostsSource {
    pub scored_posts_server: Arc<ScoredPostsServer>,
}

#[async_trait]
impl Source<ScoredPostsQuery, FeedItem> for ScoredPostsSource {
    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<FeedItem>, String> {
        let output = self
            .scored_posts_server
            .run_pipeline(query.clone())
            .await
            .map_err(|e| format!("ScoredPostsSource: {e}"))?;

        let feed_items = output
            .scored_posts
            .into_iter()
            .map(|post| FeedItem {
                position: 0,
                item: Some(feed_item::Item::Post(post)),
            })
            .collect();

        Ok(feed_items)
    }
}

scored_posts_server.run_pipeline(query.clone())runs the entire inner Phoenix pipeline as a sub-call. Returns scored posts.

Wraps each ScoredPost in a FeedItem with item: feed_item::Item::Post(post). So the For You blender (Session 10) can mix these with ads / prompts / who-to-follow.

query.clone() is a real clone of the 50-field ScoredPostsQuery. Heavy but unavoidable — the For You pipeline needs to keep using the query, and the inner pipeline also needs ownership.


ads_source.rs (60 lines)

Calls the ad index for eligible ads.

pub struct AdsSource {
    pub ad_index_client: Arc<dyn AdIndexClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, FeedItem> for AdsSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableAdsSource) && !query.is_preview
    }

Two gates: flag + not a preview request. Preview requests (pre-fetching) don't include ads — we don't want to "waste" an ad impression on content the user might never see.

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<FeedItem>, String> {
        let request = build_ad_index_request(query);

        let response = self
            .ad_index_client
            .get_eligible_ads(request)
            .await
            .map_err(|e| format!("AdsSource: {e}"))?;

        let feed_items = response
            .ad_info
            .into_iter()
            .map(|ad| FeedItem {
                position: 0,
                item: Some(feed_item::Item::Ad(ad)),
            })
            .collect();
        Ok(feed_items)
    }
}

fn build_ad_index_request(query: &ScoredPostsQuery) -> AdIndexRequest {
    AdIndexRequest {
        user_id: query.user_id as i64,
        product_surface: ProductSurface::HomeTimelineRanking as i32,
        client_context: Some(ClientContext {
            user_id: query.user_id as i64,
            app_id: query.client_app_id as i64,
            country_code: query.country_code.clone(),
            language_code: query.language_code.clone(),
            ip_address: query.ip_address.clone(),
            user_agent: query.user_agent.clone(),
            user_roles: query.user_roles.clone(),
            device_id: query.device_id.clone(),
            mobile_device_id: query.mobile_device_id.clone(),
            mobile_device_ad_id: query.mobile_device_ad_id.clone(),
            ..Default::default()
        }),
        ..Default::default()
    }
}

Build the request with a lot of context fields (the ad index uses many of these for targeting), call the service, wrap each ad in a FeedItem.

Note mobile_device_ad_id — the IDFA / GAID for ad attribution. Passed through.


who_to_follow_source.rs (101 lines)

Fetches user recommendations for the WTF module.

const EXCLUDED_USER_IDS_LIMIT: usize = 200;
const MAX_WHO_TO_FOLLOW_USERS: usize = 3;

pub struct WhoToFollowSource {
    pub who_to_follow_client: Arc<dyn WhoToFollowClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, FeedItem> for WhoToFollowSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableWhoToFollowModule) && query.who_to_follow_eligible
    }

Two gates: flag + who_to_follow_eligible (set by ServedHistoryQueryHydrator based on fatigue — Session 09).

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<FeedItem>, String> {
        let request = build_wtf_request(query);

        let response = self
            .who_to_follow_client
            .get_wtf_recommendations(request)
            .await
            .map_err(|e| format!("WhoToFollowSource: {e}"))?;

        if response.user_recommendations.is_empty() {
            return Ok(vec![]);
        }

        let mut response = response;
        response
            .user_recommendations
            .truncate(MAX_WHO_TO_FOLLOW_USERS);

        let module = WhoToFollowModule {
            who_to_follow_response: Some(response),
        };

        Ok(vec![FeedItem {
            position: 0,
            item: Some(feed_item::Item::WhoToFollow(module)),
        }])
    }
}

Three behaviors:

  • Empty response → empty source result (no WTF module).
  • Otherwise: truncate to 3 users (the module shows max 3).
  • Return a single WhoToFollowModule wrapped in a FeedItem.

MAX_WHO_TO_FOLLOW_USERS = 3 — UX decision baked in code.

fn get_excluded_user_ids(query: &ScoredPostsQuery) -> Vec<i64> {
    query
        .served_history
        .iter()
        .flat_map(|sh| &sh.entries)
        .filter(|entry| entry.entity_type == EntityIdType::WHO_TO_FOLLOW)
        .flat_map(|entry| {
            entry
                .item_ids
                .iter()
                .flatten()
                .filter_map(|item| item.user_id)
        })
        .take(EXCLUDED_USER_IDS_LIMIT)
        .collect()
}

Build the "don't recommend these users" list from served history — every user previously recommended in a WTF module. Capped at 200.

This is per-user fatigue: even if the WTF module is eligible by the time interval (fatigue_hours), don't re-recommend the same users.


prompts_source.rs (98 lines)

Fetches in-feed prompts (compose suggestions, verifications, polls, etc.) from the injection service.

pub struct PromptsSource {
    pub prompts_client: Arc<dyn PromptsClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, FeedItem> for PromptsSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnablePrompts)
    }

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<FeedItem>, String> {
        let request = build_get_injections_request(query);

        let injections = self
            .prompts_client
            .get_injections(request)
            .await
            .map_err(|e| format!("PromptsSource: {e}"))?;

        injections
            .into_iter()
            .map(|injection| {
                let bytes = xai_prompts_thrift::serialize_binary(&injection)
                    .map_err(|e| format!("PromptsSource: serialization failed: {e}"))?;
                Ok(FeedItem {
                    position: 0,
                    item: Some(feed_item::Item::Prompt(Prompt {
                        injection: bytes.into(),
                    })),
                })
            })
            .collect()
    }
}

The prompt is re-serialized as binary Thrift bytes before going on the wire. The client treats prompts as opaque blobs — only the prompts service understands the schema.

bytes.into() converts Vec<u8>Bytes (zero-copy in many cases).

fn build_get_injections_request(query: &ScoredPostsQuery) -> GetInjectionsRequest {
    let client_context = ClientContext {
        user_id: Some(query.user_id as i64),
        client_application_id: Some(query.client_app_id as i64),
        device_id: if query.device_id.is_empty() {
            None
        } else {
            Some(query.device_id.clone())
        },
        country_code: Some(query.country_code.clone()),
        language_code: Some(query.language_code.clone()),
        user_agent: Some(query.user_agent.clone()),
        ip_address: Some(query.ip_address.clone()),
    };

    let display_context = DisplayContext {
        display_location: DisplayLocation::HOME_TIMELINE,
        timeline_id: None,
    };

    let request_targeting_context = RequestTargetingContext {
        ranking_disabler_with_latest_controls_avaliable: None,
        is_empty_state: None,
        is_first_request_after_signup: None,
        is_end_of_timeline: None,
    };

    let supported_prompt_types: BTreeSet<PromptType> = [
        PromptType::INLINE_PROMPT,
        PromptType::FULL_COVER,
        PromptType::HALF_COVER,
        PromptType::RELEVANCE_PROMPT,
    ]
    .into_iter()
    .collect();

    let user_roles = if query.user_roles.is_empty() {
        None
    } else {
        Some(query.user_roles.iter().cloned().collect())
    };

    GetInjectionsRequest {
        client_context,
        display_context,
        request_targeting_context: Some(request_targeting_context),
        user_roles,
        supported_prompt_types: Some(supported_prompt_types),
    }
}

Heavy request building. Notable:

  • supported_prompt_types: a BTreeSet of what types the client can render (inline, full-cover, half-cover, relevance). The server only returns these. New prompt types can be added on the server without breaking older clients.
  • request_targeting_context — placeholder fields, currently all None. Future use.

Spelling note: ranking_disabler_with_latest_controls_avaliable — typo (avaliableavailable) in the field name. Once a Thrift schema typo is shipped, it tends to stick.


push_to_home_source.rs (97 lines) — notification-clicked post

When a user clicks a notification, we want the related post pinned to position 0 in the feed. This source fetches that post's core data + replier facepile.

const MAX_REPLIERS: usize = 3;

pub struct PushToHomeSource {
    pub tes_client: Arc<dyn TESClient + Send + Sync>,
    pub reply_mixer_client: Arc<dyn ReplyMixerClient>,
}

#[async_trait]
impl Source<ScoredPostsQuery, FeedItem> for PushToHomeSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.push_to_home_post_id.is_some()
    }

Only enabled when the request carries a push_to_home_post_id (set when the user came from a notification — see Session 04's QueryBuilder).

    async fn source(&self, query: &ScoredPostsQuery) -> Result<Vec<FeedItem>, String> {
        let focal_tweet_id = match query.push_to_home_post_id {
            Some(id) => id,
            None => return Ok(vec![]),
        };

        let core_data = self
            .tes_client
            .get_tweet_core_datas(vec![focal_tweet_id])
            .await;

        let core_data = match core_data.get(&focal_tweet_id) {
            Some(Ok(Some(cd))) => cd,
            Some(Ok(None)) => return Ok(vec![]),
            Some(Err(e)) => {
                return Err(format!(
                    "PushToHomeSource: TES error for tweet {focal_tweet_id}: {e}"
                ));
            }
            None => return Ok(vec![]),
        };

Fetch the tweet's core data from TES. Three-way match same as Session 07:

  • Tweet exists with data → use it.
  • Tweet not found (deleted/missing) → return empty (no PTH).
  • Error → propagate.
  • Missing key entirely → return empty.
        let is_root = core_data.in_reply_to_tweet_id.is_none();

        let facepile_ids = if is_root {
            self.fetch_top_repliers(query.user_id, focal_tweet_id, core_data.author_id)
                .await
        } else {
            vec![]
        };

If the tweet is a root tweet (not a reply), fetch the top repliers for a facepile. If the tweet is a reply itself, skip the facepile.

        let pth_post = PushToHomePost {
            tweet_id: focal_tweet_id,
            author_id: core_data.author_id,
            in_reply_to_tweet_id: core_data.in_reply_to_tweet_id.unwrap_or(0),
            conversation_id: core_data.conversation_id.unwrap_or(0),
            facepile_user_ids: facepile_ids,
            served_type: ServedType::ForYouPushToHome as i32,
        };

        Ok(vec![FeedItem {
            position: 0,
            item: Some(feed_item::Item::PushToHome(pth_post)),
        }])
    }
}

Build the PushToHomePost with structural fields + facepile + the ForYouPushToHome served_type. Return as a single FeedItem (feed_item::Item::PushToHome variant).

impl PushToHomeSource {
    async fn fetch_top_repliers(
        &self,
        viewer_id: u64,
        focal_tweet_id: u64,
        author_id: u64,
    ) -> Vec<u64> {
        match self
            .reply_mixer_client
            .get_scored_replies(viewer_id, focal_tweet_id, focal_tweet_id, author_id)
            .await
        {
            Ok(replies) => replies
                .into_iter()
                .filter_map(|r| {
                    r.author_id
                        .map(|id| id as u64)
                        .filter(|&id| id != author_id)
                })
                .take(MAX_REPLIERS)
                .collect(),
            Err(e) => {
                warn!(error = %e, "PushToHomeSource: reply-mixer failed, skipping repliers");
                vec![]
            }
        }
    }
}

Call the reply mixer service for ranked replies. Take top 3 unique repliers (filtering out the original author).

Graceful degradation: if reply-mixer fails, log a warning and return empty facepile. The PTH module still renders, just without faces.

get_scored_replies(viewer_id, focal_tweet_id, focal_tweet_id, author_id) — note the duplicated focal_tweet_id. The signature is probably (viewer_id, root_id, reply_to_id, author_id). When the focal tweet is a root, root and reply_to are the same. (When the focal is itself a reply, root would be the conversation root — but we skipped that case above.)


What we've learned

Source structure: every source returns Vec<C> where C is the pipeline's candidate type. Phoenix sources → PostCandidate, For You sources → FeedItem.

The 6 Phoenix sources, fanning out:

  • ThunderSource — in-network from Thunder.
  • TweetMixerSource — legacy OON.
  • PhoenixSource — modern OON via two-tower retrieval.
  • PhoenixTopicsSource — topic-targeted OON.
  • PhoenixMOESource — Mixture-of-Experts OON.
  • CachedPostsSource — full bypass when cache hits.

All run in parallel via the framework. Results merge, dedup via DropDuplicatesFilter.

The 5 For You sources, fanning out:

  • ScoredPostsSource — recurses into the Phoenix pipeline (the inner one).
  • AdsSource — eligible ads.
  • WhoToFollowSource — WTF module.
  • PromptsSource — in-feed injections.
  • PushToHomeSource — notification-pinned post.

Mutually-exclusive enable gates: CachedPostsSource runs only when has_cached_posts=true; all other Phoenix sources run only when has_cached_posts=false. So either the bypass fires or the full retrieval set fires.

Topic source dispatch:

  • 1-6 topics → PhoenixTopicsSource (with topic filter mode applied).
  • 7+ topics → PhoenixSource (bulk request, no topic filter).
  • No topics → PhoenixSource (default).
  • New-user with declared topics → PhoenixTopicsSource (using new_user_topic_ids).

Three Phoenix retrieval variants — same PhoenixRetrievalClient API, three different cluster IDs, three different served_type tags. Lets dashboards measure each variant's contribution.

Cluster resolution pattern (repeated across PhoenixSource and PhoenixScorer):

  1. Read configured cluster from feature switches.
  2. Override to new-user cluster if history < threshold.
  3. Apply decider-based experiment swaps (Fou ↔ Lap7).

Side-effect-as-source-effect: ThunderSource populates query.in_network_replies (via OnceLock) as a side effect of fetching candidates. This data is later consumed by FollowingRepliedUsersHydrator. Cross-stage data flow via shared OnceLock.

Source-level age filtering: TweetMixerSource drops too-old candidates immediately, before they reach AgeFilter. Cheap pre-filter — saves hydration work on candidates that would be filtered anyway.

ForYouPushToHome is a separate served_type — for analytics. Notification-driven traffic is reported separately.

Graceful degradation patterns:

  • Reply-mixer fail in PTH → empty facepile (UX-acceptable).
  • WTF empty response → no module (UX-acceptable).
  • TES errors in PTH → propagate Err (no degraded mode — important post couldn't be hydrated).

Heterogeneous union: For You candidates are FeedItem (post / ad / wtf / prompt / pth). Each source returns one variant. The blender selector knows how to interleave.

Service-discovery via get_random_channel(cluster): Thunder uses cluster-based load balancing. The cluster has multiple Thunder instances; we pick one randomly per request.

Three-tier OON exploration: a single For You request typically fans out to Thunder + TweetMixer + 3 Phoenix flavors simultaneously. After dedup, the candidate pool has up to N×5 candidates from independent retrieval strategies. The downstream Phoenix scorer (Session 10) sees the merged set and picks the best per its own criteria.


Next session

Session 12 — Side effects, part 1. The first 8 side-effects of the 14 total. About 1,000 LOC:

  • cache_request_info_side_effect.rs
  • mutual_follow_stats_side_effect.rs
  • for_you_response_stats_side_effect.rs
  • truncate_served_history_side_effect.rs
  • update_past_request_timestamps_side_effect.rs
  • update_served_history_side_effect.rs
  • redis_post_candidate_cache_side_effect.rs
  • publish_seen_ids_to_kafka_side_effect.rs