X For You algorithm, line by line · Part 6

X For You algorithm, line by line — Part 6: Concrete candidate pipelines

Part 6 of the deep dive into xai-org/x-algorithm. The two pipeline declarations that wire every stage component: ForYouCandidatePipeline (5 sources + blender + 8 side effects) and PhoenixCandidatePipeline (15 query hydrators, 6 sources, 10 hydrators, 14 filters, 3 scorers, 6 post-selection hydrators, 3 post-selection filters, 6 side effects). Plus orphan-file analysis.

May 15, 2026·25 min read

This is the keystone session. Up to now we've read the framework (candidate-pipeline crate, Session 01) and the building blocks (filters in Session 05, models in Session 04). Now we read the actual pipeline configuration — the file that lists every source, hydrator, filter, scorer, and side-effect that runs when X processes a real feed request.

After this session you'll see, in one place, the complete list of components that fire per request. The remaining sessions then go into each component's implementation.

Files covered (1,279 LOC across 7 files):

home-mixer/candidate_pipeline/
├── mod.rs                                 (2)     re-exports
├── for_you_candidate_pipeline.rs          (278)   the outer For You pipeline
├── phoenix_candidate_pipeline.rs          (772)   the inner Scored Posts pipeline
├── query.rs                               (69)    [orphan] simpler ScoredPostsQuery variant
├── query_features.rs                      (10)    [orphan] simple UserFeatures
├── candidate.rs                           (70)    [orphan] simpler PostCandidate variant
└── candidate_features.rs                  (78)    [orphan] Gizmoduck/Thrift-shaped types

The four "orphan" files aren't referenced from mod.rs and aren't used in the wired pipelines — they appear to be remnants of an earlier refactor where these types lived next to the pipelines instead of in home-mixer/models/. We'll cover them briefly at the end.


mod.rs (2 lines)

pub mod for_you_candidate_pipeline;
pub mod phoenix_candidate_pipeline;

Just the two pipeline files. The other 4 .rs files in this directory are not declared here, which means they don't get compiled as part of this module tree. Either they're imported from elsewhere via #[path = "..."] (unlikely) or they're dead files. We'll see at the end.


for_you_candidate_pipeline.rs (278 lines) — the outer pipeline

This is the For You pipeline. It wraps the Phoenix pipeline as one of its sources and adds the For-You-specific blending (ads, who-to-follow, prompts, push-to-home).

Imports

use crate::clients::ad_index_client::{AdIndexClient, MockAdIndexClient, ProdAdIndexClient};
use crate::clients::kafka_publisher_client::{KafkaPublisherClient, MockKafkaPublisherClient};
use crate::clients::past_request_timestamps_client::{
    MockPastRequestTimestampsClient, PastRequestTimestampsClient, ProdPastRequestTimestampsClient,
};
use crate::clients::prompts_client::{MockPromptsClient, ProdPromptsClient, PromptsClient};
use crate::clients::served_history_client::{
    MockServedHistoryClient, ProdServedHistoryClient, ServedHistoryClient,
};
use crate::clients::tweet_entity_service_client::{MockTESClient, ProdTESClient, TESClient};
use crate::clients::who_to_follow_client::{
    MockWhoToFollowClient, ProdWhoToFollowClient, WhoToFollowClient,
};

A long block of client imports. Every external service has a triple:

  • A trait XxxClient (the abstraction)
  • A production impl ProdXxxClient (real network calls)
  • A mock impl MockXxxClient (for tests)

This trait-with-prod-and-mock pattern is repeated for every dependency. It's verbose but makes it trivial to write a mock() constructor — no DI framework, no wiring magic.

use crate::models::query::ScoredPostsQuery;
use crate::params;
use crate::query_hydrators::past_request_timestamps_query_hydrator::PastRequestTimestampsQueryHydrator;
use crate::query_hydrators::served_history_query_hydrator::ServedHistoryQueryHydrator;
use crate::scored_posts_server::ScoredPostsServer;
use crate::selectors::BlenderSelector;
use crate::side_effects::ads_injection_logging_side_effect::AdsInjectionLoggingSideEffect;
use crate::side_effects::client_events_kafka_side_effect::ClientEventsKafkaSideEffect;
use crate::side_effects::for_you_response_stats_side_effect::ForYouResponseStatsSideEffect;
use crate::side_effects::publish_seen_ids_to_kafka_side_effect::PublishSeenIdsToKafkaSideEffect;
use crate::side_effects::served_candidates_kafka_side_effect::ServedCandidatesKafkaSideEffect;
use crate::side_effects::truncate_served_history_side_effect::TruncateServedHistorySideEffect;
use crate::side_effects::update_past_request_timestamps_side_effect::UpdatePastRequestTimestampsSideEffect;
use crate::side_effects::update_served_history_side_effect::UpdateServedHistorySideEffect;
use crate::sources::ads_source::AdsSource;
use crate::sources::prompts_source::PromptsSource;
use crate::sources::push_to_home_source::PushToHomeSource;
use crate::sources::scored_posts_source::ScoredPostsSource;
use crate::sources::who_to_follow_source::WhoToFollowSource;

The For You pipeline's stage imports. Notice:

  • 2 query hydrators (served history + past timestamps)
  • 5 sources (ScoredPosts is the inner pipeline; the other four are non-post or specialty sources)
  • 1 selector (BlenderSelector)
  • 8 side-effects

No hydrators, no filters, no scorers, no post-selection. The For You pipeline does almost no per-candidate work — that's the inner Phoenix pipeline's job. For You is mostly about blending the scored posts with other module types and logging the final timeline.

use std::sync::Arc;
use tonic::async_trait;
use xai_candidate_pipeline::candidate_pipeline::CandidatePipeline;
use xai_candidate_pipeline::component_library::clients::{
    MockReplyMixerClient, ProdReplyMixerClient, ReplyMixerClient,
};
use xai_candidate_pipeline::filter::Filter;
use xai_candidate_pipeline::hydrator::Hydrator;
use xai_candidate_pipeline::query_hydrator::QueryHydrator;
use xai_candidate_pipeline::scorer::Scorer;
use xai_candidate_pipeline::selector::Selector;
use xai_candidate_pipeline::side_effect::SideEffect;
use xai_candidate_pipeline::source::Source;
use xai_home_mixer_proto::FeedItem;

All the trait imports + FeedItem. Note FeedItem is the candidate type for the For You pipeline — not PostCandidate. A FeedItem is the wire format that can hold a post, a prompt, a who-to-follow card, an ad, etc. The For You pipeline operates on this union type because it blends heterogeneous content.

Struct

pub struct ForYouCandidatePipeline {
    query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>>,
    sources: Vec<Box<dyn Source<ScoredPostsQuery, FeedItem>>>,
    selector: BlenderSelector,
    side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, FeedItem>>>>,
}

Four fields — only the stages that are actually used. The omitted stages (hydrators, filters, scorers, etc.) get default empty slices from the trait impl below. Concrete BlenderSelector rather than Box<dyn Selector<…>> because there's only one selector — no need for the indirection.

new — production constructor

impl ForYouCandidatePipeline {
    pub async fn new(scored_posts_server: Arc<ScoredPostsServer>, datacenter: &str) -> Self {
        let (
            ad_index_client,
            ads_injection_logging,
            publish_seen_ids,
            served_candidates,
            client_events,
            served_history_client,
            who_to_follow_client,
            prompts_client,
            past_request_timestamps_client,
            tes_client,
            reply_mixer_client,
        ) = tokio::join!(
            async {
                Arc::new(
                    ProdAdIndexClient::new(datacenter)
                        .await
                        .expect("Failed to create AdIndex client"),
                ) as Arc<dyn AdIndexClient + Send + Sync>
            },
            AdsInjectionLoggingSideEffect::prod(),
            PublishSeenIdsToKafkaSideEffect::prod(),
            ServedCandidatesKafkaSideEffect::prod(),
            ClientEventsKafkaSideEffect::prod(),
            …

The takeaway: all client construction happens in parallel via tokio::join!. Building these clients (which involves resolving service discovery, opening TCP/TLS connections, fetching certs) is I/O-bound. tokio::join! lets all of them happen concurrently.

This matters for cold-start time: serial construction would take seconds (each client adds latency). Parallel construction completes when the slowest one finishes — typically <500ms.

tokio::join! is the macro form. It returns a tuple of all the results, in the same order. Each async { ... } block is a separate future.

The as Arc<dyn AdIndexClient + Send + Sync> cast: each ProdXxxClient::new() returns a concrete type; we cast it to Arc<dyn Trait> so the rest of the code can treat it polymorphically. Required because vec![box1, box2, …] needs all elements to have the same type.

.expect("Failed to create XYZ client") — panic on construction failure. The build orchestrator (XServiceBuilder from Session 04) treats a build panic as "service can't start," which causes the process to crash and Kubernetes to retry.

        Self::build(
            scored_posts_server,
            ad_index_client,
            ads_injection_logging,
            …
        )
    }

After the tokio::join! resolves, hand off to the synchronous build constructor.

build — actual construction

    fn build(
        scored_posts_server: Arc<ScoredPostsServer>,
        ad_index_client: Arc<dyn AdIndexClient + Send + Sync>,
        ads_injection_logging: AdsInjectionLoggingSideEffect,
        publish_seen_ids: PublishSeenIdsToKafkaSideEffect,
        served_candidates: ServedCandidatesKafkaSideEffect,
        client_events: ClientEventsKafkaSideEffect,
        served_history_client: Arc<dyn ServedHistoryClient>,
        who_to_follow_client: Arc<dyn WhoToFollowClient + Send + Sync>,
        prompts_client: Arc<dyn PromptsClient + Send + Sync>,
        past_request_timestamps_client: Arc<dyn PastRequestTimestampsClient>,
        tes_client: Arc<dyn TESClient + Send + Sync>,
        reply_mixer_client: Arc<dyn ReplyMixerClient>,
    ) -> Self {

This takes all the constructed clients and returns the pipeline. The split between new/build and mock is intentional — both new and mock produce the same set of dependencies, then both call build. So the actual pipeline composition lives in one place.

        let query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>> = vec![
            Box::new(ServedHistoryQueryHydrator::from_client(Arc::clone(
                &served_history_client,
            ))),
            Box::new(PastRequestTimestampsQueryHydrator::new(Arc::clone(
                &past_request_timestamps_client,
            ))),
        ];

Two query hydrators for For You:

  • ServedHistoryQueryHydrator — fetches the user's recently-served posts from Manhattan (X's KV store). Used by the PreviouslyServedPostsFilter we saw in Session 05.
  • PastRequestTimestampsQueryHydrator — fetches when this user last made requests, used for polling/freshness decisions.

These run in parallel (the wave-1 query hydrators from Session 01's framework). They don't depend on each other.

        let sources: Vec<Box<dyn Source<ScoredPostsQuery, FeedItem>>> = vec![
            Box::new(ScoredPostsSource {
                scored_posts_server,
            }),
            Box::new(AdsSource { ad_index_client }),
            Box::new(WhoToFollowSource {
                who_to_follow_client,
            }),
            Box::new(PromptsSource { prompts_client }),
            Box::new(PushToHomeSource {
                tes_client,
                reply_mixer_client,
            }),
        ];

Five sources for For You:

  1. ScoredPostsSource — wraps the inner ScoredPostsServer (which runs the Phoenix pipeline). This is the workhorse: returns ~50-100 scored posts.
  2. AdsSource — queries the ad index for promoted posts.
  3. WhoToFollowSource — fetches user recommendations from the WTF service.
  4. PromptsSource — fetches in-feed prompts (e.g. "vote in this poll," "verify your account").
  5. PushToHomeSource — special-case source for the "click notification → pin its post" flow (uses query.push_to_home_post_id from Session 04).

All five run in parallel and their outputs (each a Vec<FeedItem>) get flattened.

        let selector = BlenderSelector::new();

A single selector. BlenderSelector does the item-type-aware blending — alternates between posts and ads, interleaves who-to-follow modules every N items, etc. We'll see its 164-line implementation in Session 10.

        let side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, FeedItem>>>> =
            Arc::new(vec![
                Box::new(ads_injection_logging),
                Box::new(publish_seen_ids),
                Box::new(served_candidates),
                Box::new(client_events),
                Box::new(ForYouResponseStatsSideEffect),
                Box::new(UpdatePastRequestTimestampsSideEffect::new(
                    past_request_timestamps_client,
                )),
                Box::new(UpdateServedHistorySideEffect::new(Arc::clone(
                    &served_history_client,
                ))),
                Box::new(TruncateServedHistorySideEffect::new(served_history_client)),
            ]);

Eight side effects for For You, fired asynchronously after the response is returned (per Session 01's run_side_effects behavior). Categories:

  • Ads logging (ads_injection_logging): records which ads were considered + chosen.
  • Kafka publishers (publish_seen_ids, served_candidates, client_events): publish to Kafka for downstream training-data and analytics pipelines.
  • Stats (ForYouResponseStatsSideEffect): per-response metrics emit.
  • State updates (UpdatePastRequestTimestampsSideEffect, UpdateServedHistorySideEffect): write to Manhattan for the next request to read in its query hydrators.
  • Maintenance (TruncateServedHistorySideEffect): trim the served-history index to bounded size.

The state-update side effects share clients with the query hydrators above — note Arc::clone(&served_history_client) and Arc::clone(&past_request_timestamps_client). Read at request start, write after response. The same client handles both.

        Self {
            query_hydrators,
            sources,
            selector,
            side_effects,
        }
    }

Pack into the struct.

mock — the test constructor

    pub async fn mock(scored_posts_server: Arc<ScoredPostsServer>) -> Self {
        let ad_index_client: Arc<dyn AdIndexClient + Send + Sync> = Arc::new(MockAdIndexClient);
        let mock_kafka = Arc::new(MockKafkaPublisherClient) as Arc<dyn KafkaPublisherClient>;
        let ads_injection = AdsInjectionLoggingSideEffect::new(Arc::clone(&mock_kafka));
        let publish_seen_ids = PublishSeenIdsToKafkaSideEffect::new(Arc::clone(&mock_kafka));
        let served_candidates = ServedCandidatesKafkaSideEffect::new(Arc::clone(&mock_kafka));
        let client_events = ClientEventsKafkaSideEffect::new(mock_kafka);
        let served_history_client: Arc<dyn ServedHistoryClient> = Arc::new(MockServedHistoryClient);
        let who_to_follow_client: Arc<dyn WhoToFollowClient + Send + Sync> =
            Arc::new(MockWhoToFollowClient);
        let prompts_client: Arc<dyn PromptsClient + Send + Sync> = Arc::new(MockPromptsClient);
        let past_request_timestamps_client: Arc<dyn PastRequestTimestampsClient> =
            Arc::new(MockPastRequestTimestampsClient);
        let tes_client: Arc<dyn TESClient + Send + Sync> = Arc::new(MockTESClient::default());
        let reply_mixer_client: Arc<dyn ReplyMixerClient> = Arc::new(MockReplyMixerClient);
        Self::build(
            scored_posts_server,
            ad_index_client,
            ads_injection,
            publish_seen_ids,
            served_candidates,
            client_events,
            served_history_client,
            who_to_follow_client,
            prompts_client,
            past_request_timestamps_client,
            tes_client,
            reply_mixer_client,
        )
    }
}

Constructs all-mock clients and calls build. Each mock is a unit struct that implements the corresponding trait with no-op or constant-returning methods. Note mock_kafka is shared across four side effects (all of them publish to Kafka, so they all need a publisher).

The Arc::clone(&mock_kafka) for the first three; the fourth consumes the original mock_kafka (no clone needed since nothing else uses it after).

The CandidatePipeline trait impl

#[async_trait]
impl CandidatePipeline<ScoredPostsQuery, FeedItem> for ForYouCandidatePipeline {
    fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<ScoredPostsQuery>>] {
        &self.query_hydrators
    }

    fn sources(&self) -> &[Box<dyn Source<ScoredPostsQuery, FeedItem>>] {
        &self.sources
    }

    fn hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, FeedItem>>] {
        &[]
    }

    fn filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, FeedItem>>] {
        &[]
    }

    fn scorers(&self) -> &[Box<dyn Scorer<ScoredPostsQuery, FeedItem>>] {
        &[]
    }

    fn selector(&self) -> &dyn Selector<ScoredPostsQuery, FeedItem> {
        &self.selector
    }

    fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, FeedItem>>] {
        &[]
    }

    fn post_selection_filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, FeedItem>>] {
        &[]
    }

    fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, FeedItem>>>> {
        Arc::clone(&self.side_effects)
    }

    fn result_size(&self) -> usize {
        params::FOR_YOU_MAX_RESULT_SIZE
    }
}

Implement the trait. Most methods return references to the four stored fields. The five empty-slice returns (hydrators, filters, scorers, post_selection_hydrators, post_selection_filters) are explicit no-ops.

So the For You pipeline executes:

query_hydrators (||):
  ServedHistoryQueryHydrator
  PastRequestTimestampsQueryHydrator
↓
sources (||):
  ScoredPostsSource (← runs inner Phoenix pipeline)
  AdsSource
  WhoToFollowSource
  PromptsSource
  PushToHomeSource
↓
(no hydrators / no filters / no scorers — candidates pass through)
↓
selector: BlenderSelector (sorts + blends + truncates)
↓
(no post-selection hydration / no post-selection filtering)
↓
truncate to FOR_YOU_MAX_RESULT_SIZE
↓
return PipelineResult, then in background:
  AdsInjectionLoggingSideEffect
  PublishSeenIdsToKafkaSideEffect
  ServedCandidatesKafkaSideEffect
  ClientEventsKafkaSideEffect
  ForYouResponseStatsSideEffect
  UpdatePastRequestTimestampsSideEffect
  UpdateServedHistorySideEffect
  TruncateServedHistorySideEffect

That's the entire For You orchestration. The complexity is in (a) the inner Phoenix pipeline inside ScoredPostsSource, and (b) the BlenderSelector that blends post + non-post sources.


phoenix_candidate_pipeline.rs (772 lines) — the inner pipeline

The big one. Defines all the post-scoring components in their fired order.

Import block (lines 1-142)

The imports list every component this pipeline uses. We can read this as the menu of features in the For You algorithm:

use crate::candidate_hydrators::ads_brand_safety_hydrator::AdsBrandSafetyHydrator;
use crate::candidate_hydrators::ads_brand_safety_vf_hydrator::AdsBrandSafetyVfHydrator;
use crate::candidate_hydrators::blocked_by_hydrator::BlockedByHydrator;
use crate::candidate_hydrators::core_data_candidate_hydrator::CoreDataCandidateHydrator;
use crate::candidate_hydrators::filtered_topics_hydrator::FilteredTopicsHydrator;
use crate::candidate_hydrators::following_replied_users_hydrator::FollowingRepliedUsersHydrator;
use crate::candidate_hydrators::gizmoduck_hydrator::GizmoduckCandidateHydrator;
use crate::candidate_hydrators::has_media_hydrator::HasMediaHydrator;
use crate::candidate_hydrators::in_network_candidate_hydrator::InNetworkCandidateHydrator;
use crate::candidate_hydrators::language_code_hydrator::LanguageCodeHydrator;
use crate::candidate_hydrators::mutual_follow_jaccard_hydrator::MutualFollowJaccardHydrator;
use crate::candidate_hydrators::quote_hydrator::QuoteHydrator;
use crate::candidate_hydrators::subscription_hydrator::SubscriptionHydrator;
use crate::candidate_hydrators::tweet_type_metrics_hydrator::TweetTypeMetricsHydrator;
use crate::candidate_hydrators::vf_candidate_hydrator::VFCandidateHydrator;
use crate::candidate_hydrators::video_duration_candidate_hydrator::VideoDurationCandidateHydrator;

16 candidate hydrators. These are the per-post enrichment stages. We'll read them in Sessions 07-08.

use crate::clients::followed_grok_topics_store_client::{
    FollowedGrokTopicsStoreClient, MockFollowedGrokTopicsStoreClient,
    ProdFollowedGrokTopicsStoreClient,
};
use crate::clients::followed_starter_packs_store_client::{
    FollowedStarterPacksStoreClient, MockFollowedStarterPacksStoreClient,
    ProdFollowedStarterPacksStoreClient,
};
use crate::clients::gender_prediction_client::{
    GenderPredictionGrpcClient, MockGenderPredictionGrpcClient, ProdGenderPredictionGrpcClient,
};
use crate::clients::gizmoduck_client::{GizmoduckClient, MockGizmoduckClient, ProdGizmoduckClient};
use crate::clients::impressed_posts_client::ImpressedPostsClient;
use crate::clients::kafka_publisher_client::{
    KafkaCluster, KafkaPublisherClient, MockKafkaPublisherClient, PHOENIX_SCORES_TOPIC,
    ProdKafkaPublisherClient, RERANKING_TOPIC,
};
use crate::clients::s2s::{S2S_CHAIN_PATH, S2S_CRT_PATH, S2S_KEY_PATH};
use crate::clients::tweet_entity_service_client::{MockTESClient, ProdTESClient, TESClient};
use crate::clients::user_action_aggregation_client::{
    MockUserActionAggregationClient, ProdUserActionAggregationClient, UserActionAggregationClient,
};
use crate::clients::user_demographics_client::{
    MockUserDemographicsClient, ProdUserDemographicsClient, UserDemographicsClient,
};
use crate::clients::user_inferred_gender_store_client::{
    MockUserInferredGenderStoreClient, ProdUserInferredGenderStoreClient,
    UserInferredGenderStoreClient,
};
use crate::clients::vm_ranker_client::{MockVMRankerClient, ProdVMRankerClient, VMRankerClient};

A dozen client trait+prod+mock triples for the various services Phoenix talks to:

  • UserActionAggregationClient — fetches the user's recent engagement sequence (the input to Phoenix model)
  • PhoenixPredictionClient — the ML inference endpoint
  • PhoenixRetrievalClient — the OON retrieval endpoint (two-tower model)
  • ThunderClient — what we read in Sessions 02-03
  • StratoClient — generic key-value lookup
  • TweetMixerClient — legacy retrieval service
  • TESClient — Tweet Entity Service (post hydration)
  • GizmoduckClient — user-data service
  • VisibilityFilteringClient — safety/visibility
  • RedisClient — generic Redis
  • VMRankerClient — a separate ranking service (likely "Video Mixer" or similar)
  • SafetyLabelStoreClient — fetches safety labels
  • ImpressionBloomFilterClient — fetches the user's impression bloom filters
  • GeoIpLocationClient — IP → location lookup
  • UserDemographicsClient — age/gender features
  • UserInferredGenderStoreClient + GenderPredictionGrpcClient — gender prediction (cached + live)
  • ImpressedPostsClient — fetches recently-impressed posts (the smaller, recent list)
  • FollowedGrokTopicsStoreClient / FollowedStarterPacksStoreClient — topic / starter pack subscriptions

That's the complete dependency surface of the Phoenix pipeline. ~20 services.

use crate::filters::age_filter::AgeFilter;
use crate::filters::ancillary_vf_filter::AncillaryVFFilter;
use crate::filters::author_socialgraph_filter::AuthorSocialgraphFilter;
use crate::filters::core_data_hydration_filter::CoreDataHydrationFilter;
use crate::filters::dedup_conversation_filter::DedupConversationFilter;
use crate::filters::drop_duplicates_filter::DropDuplicatesFilter;
use crate::filters::ineligible_subscription_filter::IneligibleSubscriptionFilter;
use crate::filters::muted_keyword_filter::MutedKeywordFilter;
use crate::filters::new_user_topic_ids_filter::NewUserTopicIdsFilter;
use crate::filters::previously_seen_posts_backup_filter::PreviouslySeenPostsBackupFilter;
use crate::filters::previously_seen_posts_filter::PreviouslySeenPostsFilter;
use crate::filters::previously_served_posts_filter::PreviouslyServedPostsFilter;
use crate::filters::retweet_deduplication_filter::RetweetDeduplicationFilter;
use crate::filters::self_tweet_filter::SelfTweetFilter;
use crate::filters::topic_ids_filter::TopicIdsFilter;
use crate::filters::vf_filter::VFFilter;
use crate::filters::video_filter::VideoFilter;

All 17 filters from Session 05. Note previously_seen_posts_backup_filter's ImpressedPostsQueryHydrator isn't wired in the production list below — it's constructed but assigned to _impressed_posts_hydrator (an unused underscore-prefix variable). This is a feature dark-launched but not active.

use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::params;

The query and candidate types (from Session 04's models/).

use crate::query_hydrators::blocked_user_ids_query_hydrator::BlockedUserIdsQueryHydrator;
use crate::query_hydrators::cached_posts_query_hydrator::CachedPostsQueryHydrator;
use crate::query_hydrators::followed_grok_topics_query_hydrator::FollowedGrokTopicsQueryHydrator;
use crate::query_hydrators::followed_starter_packs_query_hydrator::FollowedStarterPacksQueryHydrator;
use crate::query_hydrators::followed_user_ids_query_hydrator::FollowedUserIdsQueryHydrator;
use crate::query_hydrators::ip_query_hydrator::IpQueryHydrator;
use crate::query_hydrators::impressed_posts_query_hydrator::ImpressedPostsQueryHydrator;
use crate::query_hydrators::impression_bloom_filter_query_hydrator::ImpressionBloomFilterQueryHydrator;
use crate::query_hydrators::inferred_grok_topics_query_hydrator::InferredGrokTopicsQueryHydrator;
use crate::query_hydrators::muted_user_ids_query_hydrator::MutedUserIdsQueryHydrator;
use crate::query_hydrators::mutual_follow_query_hydrator::MutualFollowQueryHydrator;
use crate::query_hydrators::retrieval_sequence_query_hydrator::RetrievalSequenceQueryHydrator;
use crate::query_hydrators::scoring_sequence_query_hydrator::ScoringSequenceQueryHydrator;
use crate::query_hydrators::subscribed_user_ids_query_hydrator::SubscribedUserIdsQueryHydrator;
use crate::query_hydrators::user_demographics_query_hydrator::UserDemographicsQueryHydrator;
use crate::query_hydrators::user_inferred_gender_query_hydrator::UserInferredGenderQueryHydrator;

15 query hydrators we'll see in Session 09.

use crate::scorers::phoenix_scorer::PhoenixScorer;
use crate::scorers::ranking_scorer::RankingScorer;
use crate::scorers::vm_ranker::VMRanker;
use crate::selectors::TopKScoreSelector;
use crate::side_effects::mutual_follow_stats_side_effect::MutualFollowStatsSideEffect;
use crate::side_effects::phoenix_experiments_side_effect::PhoenixExperimentsSideEffect;
use crate::side_effects::phoenix_request_cache_side_effect::PhoenixRequestCacheSideEffect;
use crate::side_effects::redis_post_candidate_cache_side_effect::RedisPostCandidateCacheSideEffect;
use crate::side_effects::reranking_kafka_side_effect::RerankingKafkaSideEffect;
use crate::side_effects::scored_stats_side_effect::ScoredStatsSideEffect;
use crate::sources::cached_posts_source::CachedPostsSource;
use crate::sources::phoenix_moe_source::PhoenixMOESource;
use crate::sources::phoenix_source::PhoenixSource;
use crate::sources::phoenix_topics_source::PhoenixTopicsSource;
use crate::sources::thunder_source::ThunderSource;
use crate::sources::tweet_mixer_source::TweetMixerSource;

The remaining components: 3 scorers, 1 selector, 6 side effects, 6 sources. Plus the framework imports.

Struct

pub struct PhoenixCandidatePipeline {
    query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>>,
    sources: Vec<Box<dyn Source<ScoredPostsQuery, PostCandidate>>>,
    hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>>,
    filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>>,
    scorers: Vec<Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>>,
    selector: TopKScoreSelector,
    post_selection_hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>>,
    post_selection_filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>>,
    side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, PostCandidate>>>>,
}

All nine stage fields are populated (vs. For You's four). This is the real pipeline.

build_with_clients — the wiring (lines 156-351)

The longest function in the file. Builds every stage in order.

    pub(crate) async fn build_with_clients(
        user_action_aggregation_client: Arc<dyn UserActionAggregationClient + Send + Sync>,
        phoenix_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
        egress_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
        phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
        thunder_client: Arc<ThunderClient>,
        strato_client: Arc<dyn StratoClient + Send + Sync>,
        tweet_mixer_client: Arc<dyn TweetMixerClient>,
        tes_client: Arc<dyn TESClient + Send + Sync>,
        gizmoduck_client: Arc<dyn GizmoduckClient + Send + Sync>,
        vf_client: Arc<dyn VisibilityFilteringClient + Send + Sync>,
        redis_client: Arc<dyn RedisClient + Send + Sync>,
        phoenix_kafka_client: Arc<dyn KafkaPublisherClient>,
        reranking_kafka_client: Arc<dyn KafkaPublisherClient>,
        socialgraph_client: Arc<dyn SocialGraphClientOps>,
        vm_ranker_client: Arc<dyn VMRankerClient>,
        safety_label_client: Arc<dyn xai_safety_label_store::SafetyLabelStoreClient>,
        vf_safety_labels_client: Arc<dyn VfClient>,
        phoenix_request_cache_redis_atla_client: Arc<dyn RedisClient + Send + Sync>,
        phoenix_request_cache_redis_pdxa_client: Arc<dyn RedisClient + Send + Sync>,
        impression_bloom_filter_client: Arc<dyn ImpressionBloomFilterClient>,
        ip_client: Arc<GeoIpLocationClient>,
        user_demographics_client: Arc<dyn UserDemographicsClient>,
        user_inferred_gender_store_client: Arc<dyn UserInferredGenderStoreClient>,
        user_inferred_gender_grpc_client: Arc<dyn GenderPredictionGrpcClient>,
        impressed_posts_client: Arc<dyn ImpressedPostsClient>,
        followed_grok_topics_client: Arc<dyn FollowedGrokTopicsStoreClient>,
        followed_starter_packs_client: Arc<dyn FollowedStarterPacksStoreClient>,
    ) -> PhoenixCandidatePipeline {

27 client parameters. Every dependency must be wired in. pub(crate) because only the in-crate prod and mock constructors should call it — external callers go through those higher-level entry points.

Notable: phoenix_client and egress_client are both PhoenixPredictionClient types. The egress client routes through an egress sidecar (probably a separate datacenter / cell), so the pipeline can call two prediction services in parallel for experiments. We'll see how when we read the PhoenixScorer (Session 10).

Query hydrators (lines 185-232)

        let query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>> = vec![
            Box::new(ScoringSequenceQueryHydrator::new(
                user_action_aggregation_client.clone(),
            )),
            Box::new(RetrievalSequenceQueryHydrator::new(
                user_action_aggregation_client,
            )),
            Box::new(BlockedUserIdsQueryHydrator {
                socialgraph_client: socialgraph_client.clone(),
            }),
            Box::new(MutedUserIdsQueryHydrator {
                socialgraph_client: socialgraph_client.clone(),
            }),
            Box::new(FollowedUserIdsQueryHydrator {
                socialgraph_client: socialgraph_client.clone(),
            }),
            Box::new(SubscribedUserIdsQueryHydrator {
                socialgraph_client: socialgraph_client.clone(),
            }),
            Box::new(CachedPostsQueryHydrator {
                redis_client: redis_client.clone(),
            }),
            Box::new(MutualFollowQueryHydrator {
                strato_client: strato_client.clone(),
            }),
            Box::new(UserDemographicsQueryHydrator {
                client: user_demographics_client,
            }),
            Box::new(FollowedGrokTopicsQueryHydrator::new(
                followed_grok_topics_client,
            )),
            Box::new(FollowedStarterPacksQueryHydrator::new(
                followed_starter_packs_client,
            )),
            Box::new(InferredGrokTopicsQueryHydrator {
                strato_client: strato_client.clone(),
            }),
            Box::new(ImpressionBloomFilterQueryHydrator {
                client: impression_bloom_filter_client,
            }),
            Box::new(IpQueryHydrator {
                client: ip_client,
            }),
            Box::new(UserInferredGenderQueryHydrator::new(
                user_inferred_gender_store_client,
                user_inferred_gender_grpc_client,
            )),
        ];

15 query hydrators, executed in parallel by the framework. Each populates a slice of ScoredPostsQuery:

Hydrator Populates
ScoringSequenceQueryHydrator scoring_sequence (engagement history for ranking)
RetrievalSequenceQueryHydrator retrieval_sequence (history for retrieval)
BlockedUserIdsQueryHydrator user_features.blocked_user_ids
MutedUserIdsQueryHydrator user_features.muted_user_ids
FollowedUserIdsQueryHydrator user_features.followed_user_ids
SubscribedUserIdsQueryHydrator user_features.subscribed_user_ids
CachedPostsQueryHydrator cached_posts, has_cached_posts
MutualFollowQueryHydrator viewer_minhash
UserDemographicsQueryHydrator user_demographics
FollowedGrokTopicsQueryHydrator followed_grok_topics (32 bools)
FollowedStarterPacksQueryHydrator followed_starter_packs (20 bools)
InferredGrokTopicsQueryHydrator inferred_grok_topics (32 bools)
ImpressionBloomFilterQueryHydrator bloom_filter_entries
IpQueryHydrator ip_location
UserInferredGenderQueryHydrator user_inferred_gender, user_inferred_gender_score

Look at the Arc::clone(&socialgraph_client) repetition: 4 hydrators share the same client. Each clone bumps the refcount (cheap). The clones go into the boxed hydrators; the original socialgraph_client keeps its own refcount.

        let _impressed_posts_hydrator = ImpressedPostsQueryHydrator {
            client: impressed_posts_client,
        };

Dark-launched but unused. The _impressed_posts_hydrator is constructed (so any construction errors surface) but never added to the query_hydrators vec. This is how a new feature is rolled out: build it first, ship the code, then wire it in via feature flag without redeploying.

Sources (lines 238-257)

        let phoenix_source = Box::new(PhoenixSource {
            phoenix_retrieval_client: phoenix_retrieval_client.clone(),
        });
        let phoenix_topics_source = Box::new(PhoenixTopicsSource {
            phoenix_retrieval_client: phoenix_retrieval_client.clone(),
        });
        let phoenix_moe_source = Box::new(PhoenixMOESource {
            phoenix_retrieval_client,
        });
        let thunder_source = Box::new(ThunderSource { thunder_client });
        let tweet_mixer_source = Box::new(TweetMixerSource { tweet_mixer_client });
        let cached_posts_source = Box::new(CachedPostsSource);
        let sources: Vec<Box<dyn Source<ScoredPostsQuery, PostCandidate>>> = vec![
            thunder_source,
            tweet_mixer_source,
            phoenix_source,
            phoenix_topics_source,
            phoenix_moe_source,
            cached_posts_source,
        ];

6 sources, all parallel:

  1. ThunderSource — in-network posts via Thunder (we read in Sessions 02-03).
  2. TweetMixerSource — legacy OON retrieval service.
  3. PhoenixSource — primary OON retrieval (two-tower model).
  4. PhoenixTopicsSource — topic-aware OON retrieval.
  5. PhoenixMOESource — Phoenix mixture-of-experts retrieval.
  6. CachedPostsSource — re-uses scored posts from the cache hydrator.

Three of the six use the same phoenix_retrieval_client (the last one consumes the Arc — note no .clone()). This is the three-flavor OON retrieval approach: same client, three query strategies, three sets of candidates.

Hydrators (lines 259-272)

        let hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>> = vec![
            Box::new(InNetworkCandidateHydrator),
            Box::new(CoreDataCandidateHydrator::new(tes_client.clone()).await),
            Box::new(QuoteHydrator::new(tes_client.clone(), socialgraph_client.clone()).await),
            Box::new(VideoDurationCandidateHydrator::new(tes_client.clone()).await),
            Box::new(HasMediaHydrator::new(tes_client.clone()).await),
            Box::new(SubscriptionHydrator::new(tes_client.clone()).await),
            Box::new(GizmoduckCandidateHydrator::new(gizmoduck_client).await),
            Box::new(BlockedByHydrator::new(socialgraph_client).await),
            Box::new(FilteredTopicsHydrator {
                strato_client: strato_client.clone(),
            }),
            Box::new(LanguageCodeHydrator::new(tes_client.clone()).await),
        ];

10 pre-scoring hydrators running in parallel. Each enriches the candidate with one slice of data. We'll read each in Sessions 07-08.

Note 5 of them use TES (Tweet Entity Service). Each is a separate hydrator because they fetch different aspects (core data, video duration, has-media, subscription gating, language code). Despite using the same client, the hydrators are separate to keep each cacheable independently — different cache TTLs, different per-candidate-key partitioning.

socialgraph_client is consumed by BlockedByHydrator (final move, no clone).

Filters (lines 274-289)

        let filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>> = vec![
            Box::new(DropDuplicatesFilter),
            Box::new(CoreDataHydrationFilter),
            Box::new(AgeFilter::new(Duration::from_secs(params::MAX_POST_AGE))),
            Box::new(SelfTweetFilter),
            Box::new(RetweetDeduplicationFilter),
            Box::new(IneligibleSubscriptionFilter),
            Box::new(PreviouslySeenPostsFilter),
            Box::new(PreviouslySeenPostsBackupFilter),
            Box::new(PreviouslyServedPostsFilter),
            Box::new(MutedKeywordFilter::new()),
            Box::new(AuthorSocialgraphFilter),
            Box::new(VideoFilter),
            Box::new(TopicIdsFilter),
            Box::new(NewUserTopicIdsFilter),
        ];

14 filters in sequence — the exact list we read in Session 05. Note the order is significant:

  1. DropDuplicatesFilter first — dedup by tweet_id, no other state needed.
  2. CoreDataHydrationFilter — drop hydration failures. After this, every candidate has author_id != 0.
  3. AgeFilter — drop old posts. Trivial.
  4. SelfTweetFilter — viewer's own posts.
  5. RetweetDeduplicationFilter — content-dedup of retweet vs. original.
  6. IneligibleSubscriptionFilter — paywalled. 7-9. PreviouslySeenPostsFilter + PreviouslySeenPostsBackupFilter + PreviouslyServedPostsFilter — three flavors of seen-content dedup.
  7. MutedKeywordFilter — expensive tokenization, runs after cheaper filters have reduced the count.
  8. AuthorSocialgraphFilter — block/mute graph.
  9. VideoFilter — video exclusion if requested.
  10. TopicIdsFilter — topic-based filtering (the giant from Session 05).
  11. NewUserTopicIdsFilter — onboarding topic filter.

Order is performance-driven: cheap filters before expensive ones, dedup before substantive filtering. By the time MutedKeywordFilter (with its tokenizer) runs, the candidate count is already reduced.

Scorers (lines 291-300)

        let phoenix_scorer = Box::new(PhoenixScorer {
            phoenix_client: phoenix_client.clone(),
            egress_client: Arc::clone(&egress_client),
        });
        let ranking_scorer = Box::new(RankingScorer);
        let vm_ranker = Box::new(VMRanker {
            client: vm_ranker_client,
        });
        let scorers: Vec<Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>> =
            vec![phoenix_scorer, ranking_scorer, vm_ranker];

3 scorers in sequence:

  1. PhoenixScorer — the Grok-based transformer. Predicts per-action probabilities, stored on candidate.phoenix_scores.
  2. RankingScorer — combines phoenix_scores into weighted_score then score. The "weighted sum + diversity attenuation" stage.
  3. VMRanker — a separate ranking service that re-scores via another model. Final adjustment.

Note phoenix_client is cloned but egress_client is also cloned (Arc::clone(&egress_client)). Both are needed because the scorer hits both endpoints.

vm_ranker_client is moved (consumed). Each Arc-clone here is cheap (refcount bump only).

Selector

        let selector = TopKScoreSelector;

A unit struct — no fields. We'll see it in Session 10: simple "sort by score, take top K." For Phoenix, simpler than the For You blender.

Post-selection hydrators (lines 304-315)

        let post_selection_hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>> = vec![
            Box::new(VFCandidateHydrator::new(vf_client.clone()).await),
            Box::new(AdsBrandSafetyHydrator::new(safety_label_client)),
            Box::new(AdsBrandSafetyVfHydrator {
                client: vf_safety_labels_client,
            }),
            Box::new(TweetTypeMetricsHydrator::new()),
            Box::new(FollowingRepliedUsersHydrator),
            Box::new(MutualFollowJaccardHydrator {
                strato_client: strato_client.clone(),
            }),
        ];

6 expensive hydrators that run only on the top-K candidates after selection:

  1. VFCandidateHydrator — visibility-filtering service call. Sets visibility_reason.
  2. AdsBrandSafetyHydrator — fetches brand-safety labels.
  3. AdsBrandSafetyVfHydrator — additional VF-side brand-safety labels.
  4. TweetTypeMetricsHydrator — computes the tweet_type_metrics blob.
  5. FollowingRepliedUsersHydrator — finds out which followed users replied to each post.
  6. MutualFollowJaccardHydrator — computes the mutual-follow Jaccard similarity.

Post-selection because they're either expensive (VF, brand safety) or only relevant after we know which posts get shown (following-replied).

Post-selection filters (lines 317-321)

        let post_selection_filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>> = vec![
            Box::new(VFFilter),
            Box::new(AncillaryVFFilter),
            Box::new(DedupConversationFilter),
        ];

3 post-selection filters:

  • VFFilter — drop posts with visibility_reason indicating drop.
  • AncillaryVFFilter — drop ancillary candidates (from Session 05's drop_ancillary_posts flag).
  • DedupConversationFilter — keep best post per thread (needs score).

These need post-selection hydration to have run (VF needs visibility_reason populated; conversation dedup needs scores).

Side effects (lines 323-338)

        let side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, PostCandidate>>>> =
            Arc::new(vec![
                Box::new(PhoenixExperimentsSideEffect::new(
                    phoenix_client,
                    egress_client,
                    phoenix_kafka_client,
                )),
                Box::new(RerankingKafkaSideEffect::new(reranking_kafka_client)),
                Box::new(RedisPostCandidateCacheSideEffect::new(redis_client)),
                Box::new(ScoredStatsSideEffect),
                Box::new(MutualFollowStatsSideEffect),
                Box::new(PhoenixRequestCacheSideEffect::new(
                    phoenix_request_cache_redis_atla_client,
                    phoenix_request_cache_redis_pdxa_client,
                )),
            ]);

6 side effects:

  • PhoenixExperimentsSideEffect — runs experimental Phoenix variants in shadow mode (uses both prediction clients).
  • RerankingKafkaSideEffect — publishes ranking decisions to Kafka.
  • RedisPostCandidateCacheSideEffect — writes the scored candidates to Redis so the next request's CachedPostsQueryHydrator can re-use them.
  • ScoredStatsSideEffect — per-request metrics emit.
  • MutualFollowStatsSideEffect — mutual-follow Jaccard distribution metrics.
  • PhoenixRequestCacheSideEffect — writes Phoenix prediction results to Redis (two clients: atla + pdxa datacenter caches). Heavy: writes raw model outputs for cache-replay analysis.

Final pack

        PhoenixCandidatePipeline {
            query_hydrators,
            hydrators,
            filters,
            sources,
            scorers,
            selector,
            post_selection_hydrators,
            post_selection_filters,
            side_effects,
        }
    }

Pack into the struct. The fields are reordered slightly from the struct definition — Rust doesn't care.

prod — production constructor (lines 353-659)

The wrapper that builds all 27 production clients in parallel, then calls build_with_clients. We won't walk every tokio::join! arm — the pattern is identical to For You's new:

    pub async fn prod(
        shard_coordinate: Option<ShardCoordinate>,
        datacenter: &str,
    ) -> PhoenixCandidatePipeline {
        let local_cache_eds = String::new();
        let atla_phoenix_cache_eds = "";
        let pdxa_phoenix_cache_eds = "";

        let (
            flock_socialgraph_client,
            user_action_aggregation_client,
            …
        ) = tokio::join!(
            async {
                Arc::new(
                    SocialGraphClient::new(
                        datacenter,
                        &S2S_CHAIN_PATH,
                        &S2S_CRT_PATH,
                        &S2S_KEY_PATH,
                    )
                    .await
                    .expect("Failed to create flock SocialGraphClient"),
                ) as Arc<dyn SocialGraphClientOps>
            },
            …

Some highlights worth pointing out:

            async {
                Arc::new(
                    ProdPhoenixRetrievalClient::new(Some((
                        PhoenixRetrievalCluster::Experiment1Fou,
                        PhoenixRetrievalCluster::Experiment1Lap7,
                    )))
                    .await
                    .expect("Failed to create Phoenix retrieval client"),
                ) as Arc<dyn PhoenixRetrievalClient + Send + Sync>
            },

The Phoenix retrieval client takes a Some((cluster1, cluster2)) parameter — two clusters with experiment names "Fou" and "Lap7". Likely codenamed experiments. The retrieval client itself probably splits queries between them.

            async {
                Arc::new(
                    XdsRedisClient::new(XdsRedisConfig {
                        eds_resource_name: atla_phoenix_cache_eds.into(),
                    })
                    .await
                    .expect("Failed to create xDS Redis client for atla phoenix cache"),
                ) as Arc<dyn RedisClient + Send + Sync>
            },
            async {
                Arc::new(
                    XdsRedisClient::new(XdsRedisConfig {
                        eds_resource_name: pdxa_phoenix_cache_eds.into(),
                    })
                    .await
                    .expect("Failed to create xDS Redis client for pdxa phoenix cache"),
                ) as Arc<dyn RedisClient + Send + Sync>
            },

Three Redis clients are constructed: local cache, atla (Atlanta) Phoenix cache, pdxa (somewhere) Phoenix cache. Cross-datacenter caching — predictions made in one DC can be replayed against another for backup / experiment comparison.

            async {
                Arc::new(
                    ProdVfClient::new(datacenter)
                        .await
                        .expect("Failed to create VF SafetyLabels client")
                        .with_timeout_ms(500)
                        .with_max_batch_size(150),
                ) as Arc<dyn VfClient>
            },

The VF safety-labels client gets explicit per-request configuration: 500ms timeout, batch size 150. This is the hottest path (safety checks every candidate); tight bounds prevent it from dragging tail latency.

The constants atla_phoenix_cache_eds, pdxa_phoenix_cache_eds, local_cache_eds, and Kafka topic names like PHOENIX_SCORES_TOPIC are all empty strings in the open-source release — redacted. Same pattern as Thunder's Kafka topics.

After the tokio::join!, hand off to build_with_clients. The function returns its result.

mock — test constructor (lines 661-728)

Same pattern but with all mocks. Repeated boilerplate of Arc::new(MockXxxClient) instances. No surprises — covers every dependency.

CandidatePipeline trait impl (lines 731-771)

#[async_trait]
impl CandidatePipeline<ScoredPostsQuery, PostCandidate> for PhoenixCandidatePipeline {
    fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<ScoredPostsQuery>>] {
        &self.query_hydrators
    }

    fn sources(&self) -> &[Box<dyn Source<ScoredPostsQuery, PostCandidate>>] {
        &self.sources
    }
    fn hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>] {
        &self.hydrators
    }

    fn filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, PostCandidate>>] {
        &self.filters
    }

    fn scorers(&self) -> &[Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>] {
        &self.scorers
    }

    fn selector(&self) -> &dyn Selector<ScoredPostsQuery, PostCandidate> {
        &self.selector
    }

    fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>] {
        &self.post_selection_hydrators
    }

    fn post_selection_filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, PostCandidate>>] {
        &self.post_selection_filters
    }

    fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, PostCandidate>>>> {
        Arc::clone(&self.side_effects)
    }

    fn result_size(&self) -> usize {
        params::RESULT_SIZE
    }
}

Trait impl. Returns references to all stage fields; clones the side-effects Arc.

That's the entire Phoenix pipeline configuration.


The orphan files

Four files exist in home-mixer/candidate_pipeline/ but aren't declared in mod.rs:

query.rs (69 lines)

use crate::candidate_pipeline::query_features::UserFeatures;
use crate::util::request_util::generate_request_id;
use xai_candidate_pipeline::candidate_pipeline::HasRequestId;
use xai_home_mixer_proto::ImpressionBloomFilterEntry;
use xai_twittercontext_proto::{GetTwitterContextViewer, TwitterContextViewer};

#[derive(Clone, Default, Debug)]
pub struct ScoredPostsQuery {
    pub user_id: i64,
    pub client_app_id: i32,
    pub country_code: String,
    pub language_code: String,
    pub seen_ids: Vec<i64>,
    pub served_ids: Vec<i64>,
    pub in_network_only: bool,
    pub is_bottom_request: bool,
    pub bloom_filter_entries: Vec<ImpressionBloomFilterEntry>,
    pub user_action_sequence: Option<xai_recsys_proto::UserActionSequence>,
    pub user_features: UserFeatures,
    pub request_id: String,
}

A simpler ScoredPostsQuery with only 12 fields (vs. the 50-field version in models/query.rs from Session 04). Note user_id: i64 (vs. u64), request_id: String (vs. u64), and Vec<i64> for IDs (vs. Vec<u64>).

This looks like an earlier version of the type — before the refactor that moved it to models/. The current production code uses the models/ version (which is what phoenix_candidate_pipeline.rs imports).

impl ScoredPostsQuery {
    pub fn new(
        user_id: i64,
        …
    ) -> Self {
        let request_id = format!("{}-{}", generate_request_id(), user_id);
        …
    }
}

impl GetTwitterContextViewer for ScoredPostsQuery { … }
impl HasRequestId for ScoredPostsQuery { … }

The orphan version has its own constructor and the HasRequestId trait impl. The HasRequestId trait is from xai_candidate_pipeline::candidate_pipeline — but the modern models/query.rs doesn't impl it. So either this trait was deprecated, or the modern code uses a different mechanism.

query_features.rs (10 lines)

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct UserFeatures {
    pub muted_keywords: Vec<String>,
    pub blocked_user_ids: Vec<i64>,
    pub muted_user_ids: Vec<i64>,
    pub followed_user_ids: Vec<i64>,
    pub subscribed_user_ids: Vec<i64>,
}

Mini UserFeatures for the orphan query. No follower_count (vs. the modern models/user_features.rs) and no MValCodec impl (vs. the modern version which has full Thrift codec support).

This is consistent with the "earlier version" theory.

candidate.rs (70 lines)

use std::collections::HashMap;
use xai_home_mixer_proto as pb;
use xai_visibility_filtering::models as vf;

#[derive(Clone, Debug, Default)]
pub struct PostCandidate {
    pub tweet_id: i64,
    pub author_id: u64,
    pub tweet_text: String,
    pub in_reply_to_tweet_id: Option<u64>,
    pub retweeted_tweet_id: Option<u64>,
    pub retweeted_user_id: Option<u64>,
    pub phoenix_scores: PhoenixScores,
    pub prediction_request_id: Option<u64>,
    pub last_scored_at_ms: Option<u64>,
    pub weighted_score: Option<f64>,
    pub score: Option<f64>,
    pub served_type: Option<pb::ServedType>,
    pub in_network: Option<bool>,
    pub ancestors: Vec<u64>,
    pub video_duration_ms: Option<i32>,
    pub author_followers_count: Option<i32>,
    pub author_screen_name: Option<String>,
    pub retweeted_screen_name: Option<String>,
    pub visibility_reason: Option<vf::FilteredReason>,
    pub subscription_author_id: Option<u64>,
}

A simpler PostCandidate — 20 fields vs. the modern ~30-field version. Missing: brand safety, safety labels, mutual follow, quote tweet fields, engagement counts, has_media, language_code, filtered_topic_ids.

#[derive(Clone, Debug, Default)]
pub struct PhoenixScores {
    pub favorite_score: Option<f64>,
    pub reply_score: Option<f64>,
    pub retweet_score: Option<f64>,
    pub photo_expand_score: Option<f64>,
    pub click_score: Option<f64>,
    pub profile_click_score: Option<f64>,
    pub vqv_score: Option<f64>,
    pub share_score: Option<f64>,
    pub share_via_dm_score: Option<f64>,
    pub share_via_copy_link_score: Option<f64>,
    pub dwell_score: Option<f64>,
    pub quote_score: Option<f64>,
    pub quoted_click_score: Option<f64>,
    pub follow_author_score: Option<f64>,
    pub not_interested_score: Option<f64>,
    pub block_author_score: Option<f64>,
    pub mute_author_score: Option<f64>,
    pub report_score: Option<f64>,
    pub dwell_time: Option<f64>,
}

Inline PhoenixScores definition (vs. the modern version which re-exports from xai_candidate_pipeline::component_library::models::PhoenixScores). 19 score fields, including vqv_score (video quality view), share_via_dm_score, share_via_copy_link_score, and the negative actions (not_interested, block_author, mute_author, report). One continuous action: dwell_time (vs. the discrete dwell_score).

This list is the complete set of actions Phoenix predicts. Worth keeping for reference.

pub trait CandidateHelpers {
    fn get_screen_names(&self) -> HashMap<u64, String>;
}

impl CandidateHelpers for PostCandidate {
    fn get_screen_names(&self) -> HashMap<u64, String> {
        let mut screen_names = HashMap::<u64, String>::new();
        if let Some(author_screen_name) = self.author_screen_name.clone() {
            screen_names.insert(self.author_id, author_screen_name);
        }
        if let (Some(retweeted_screen_name), Some(retweeted_user_id)) =
            (self.retweeted_screen_name.clone(), self.retweeted_user_id)
        {
            screen_names.insert(retweeted_user_id, retweeted_screen_name);
        }
        screen_names
    }
}

Same CandidateHelpers trait as the modern version, but only one method — get_screen_names. The modern version has 4 methods including as_tweet_info which converts to the Phoenix proto input.

candidate_features.rs (78 lines)

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct PureCoreData {
    pub author_id: u64,
    pub text: String,
    pub source_tweet_id: Option<u64>,
    pub source_user_id: Option<u64>,
    pub in_reply_to_tweet_id: Option<u64>,
    pub in_reply_to_user_id: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct ExclusiveTweetControl {
    pub conversation_author_id: i64,
}

pub type MediaEntities = Vec<MediaEntity>;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct MediaEntity {
    pub media_info: Option<MediaInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub enum MediaInfo {
    VideoInfo(VideoInfo),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct VideoInfo {
    pub duration_millis: i32,
}

…

The orphan version of candidate_features.rs includes inline definitions of PureCoreData, ExclusiveTweetControl, MediaEntity, MediaInfo, VideoInfo, Share, Reply, GizmoduckUserCounts, GizmoduckUserProfile, GizmoduckUser, GizmoduckUserResult.

The modern version (models/candidate_features.rs from Session 04) pub use-imports these from xai_core_entities::entities. So this is the inline form of types that have since been moved out.

Net interpretation: these four files are pre-refactor leftovers. They're not compiled (mod.rs doesn't declare them), but they remain in the source tree — probably because they were excluded from the cleanup PR by accident, or kept as a reference for anyone tracing through git history.

Doesn't affect runtime; worth knowing if anyone tries to make changes here and gets confused.


What we've learned

The whole For-You pipeline, in one place:

ForYouCandidatePipeline (FeedItem)
├── QueryHydrators (2): ServedHistory, PastRequestTimestamps
├── Sources (5): ScoredPosts*, Ads, WhoToFollow, Prompts, PushToHome
├── Selector: BlenderSelector
└── SideEffects (8): kafka publishers + state updates + maintenance

*ScoredPosts wraps:

PhoenixCandidatePipeline (PostCandidate)
├── QueryHydrators (15): scoring/retrieval sequences, social-graph, demographics, IP, gender, bloom filters
├── Sources (6): Thunder, TweetMixer, 3 Phoenix variants, Cached
├── Hydrators (10): in-network, core data, quote, video duration, has media,
│                  subscription, Gizmoduck, blocked-by, filtered topics, language
├── Filters (14): dedup, hydration check, age, self, retweet dedup, subscription,
│                3 seen flavors, muted keywords, social graph, video, topic, new-user topic
├── Scorers (3): Phoenix model, Ranking, VM Ranker
├── Selector: TopKScoreSelector
├── PostSelectionHydrators (6): VF, brand safety (×2), tweet type metrics,
│                                following replied, mutual follow Jaccard
├── PostSelectionFilters (3): VF, Ancillary VF, Conversation dedup
└── SideEffects (6): Phoenix experiments, Reranking kafka, Redis cache,
                     scored stats, mutual follow stats, Phoenix request cache

Dependency injection without a framework: every component takes its dependencies as Arc in its constructor. Pipeline construction is just verbose Box::new with the right clients passed in. Both prod and mock constructors produce dependencies, then call build_with_clients which composes them. No magic.

Cold-start optimization: tokio::join! for all client construction. Saves seconds at startup.

Sequential vs parallel execution (recall from Session 01):

  • Query hydrators: parallel.
  • Sources: parallel.
  • Hydrators: parallel.
  • Filters: sequential.
  • Scorers: sequential.
  • Side-effects: parallel, fire-and-forget.

The pipeline doc here just lists the components; the framework code decides parallelism.

Filter order matters: cheap → expensive, dedup first, expensive filters (muted keyword tokenization, topic taxonomy) after.

Two-pass hydration: 10 hydrators run on all candidates pre-filter; 6 more expensive ones run only on the top-K post-selection. Saves work on candidates that get filtered out.

Three-flavor OON retrieval: Phoenix, PhoenixTopics, PhoenixMOE — three different retrieval strategies feeding the same downstream pipeline. Different queries to the same retrieval client.

Dark-launched code: ImpressedPostsQueryHydrator is constructed but never wired — built first, activated later via feature flags.

Mock-everywhere: every client has a Mock* variant. The mock() constructor on each pipeline returns a fully-functional (but non-network-touching) pipeline for tests.

Orphan files: candidate.rs, candidate_features.rs, query.rs, query_features.rs are dead code in this directory — not declared in mod.rs. Probably pre-refactor leftovers. Don't touch.


Next session

Session 07 — Candidate hydrators, part 1. We start reading the 16 hydrator implementations the Phoenix pipeline composes. Part 1 covers the simpler / structural ones (~900 LOC):

  • in_network_candidate_hydrator.rs
  • core_data_candidate_hydrator.rs
  • subscription_hydrator.rs
  • gizmoduck_hydrator.rs
  • blocked_by_hydrator.rs
  • has_media_hydrator.rs
  • language_code_hydrator.rs
  • video_duration_candidate_hydrator.rs

Session 08 covers the rest.