X For You algorithm, line by line · Part 6
X For You algorithm, line by line — Part 6: Concrete candidate pipelines
Part 6 of the deep dive into xai-org/x-algorithm. The two pipeline declarations that wire every stage component: ForYouCandidatePipeline (5 sources + blender + 8 side effects) and PhoenixCandidatePipeline (15 query hydrators, 6 sources, 10 hydrators, 14 filters, 3 scorers, 6 post-selection hydrators, 3 post-selection filters, 6 side effects). Plus orphan-file analysis.
This is the keystone session. Up to now we've read the framework (candidate-pipeline crate, Session 01) and the building blocks (filters in Session 05, models in Session 04). Now we read the actual pipeline configuration — the file that lists every source, hydrator, filter, scorer, and side-effect that runs when X processes a real feed request.
After this session you'll see, in one place, the complete list of components that fire per request. The remaining sessions then go into each component's implementation.
Files covered (1,279 LOC across 7 files):
home-mixer/candidate_pipeline/
├── mod.rs (2) re-exports
├── for_you_candidate_pipeline.rs (278) the outer For You pipeline
├── phoenix_candidate_pipeline.rs (772) the inner Scored Posts pipeline
├── query.rs (69) [orphan] simpler ScoredPostsQuery variant
├── query_features.rs (10) [orphan] simple UserFeatures
├── candidate.rs (70) [orphan] simpler PostCandidate variant
└── candidate_features.rs (78) [orphan] Gizmoduck/Thrift-shaped types
The four "orphan" files aren't referenced from mod.rs and aren't used in the wired pipelines — they appear to be remnants of an earlier refactor where these types lived next to the pipelines instead of in home-mixer/models/. We'll cover them briefly at the end.
mod.rs (2 lines)
pub mod for_you_candidate_pipeline;
pub mod phoenix_candidate_pipeline;
Just the two pipeline files. The other 4 .rs files in this directory are not declared here, which means they don't get compiled as part of this module tree. Either they're imported from elsewhere via #[path = "..."] (unlikely) or they're dead files. We'll see at the end.
for_you_candidate_pipeline.rs (278 lines) — the outer pipeline
This is the For You pipeline. It wraps the Phoenix pipeline as one of its sources and adds the For-You-specific blending (ads, who-to-follow, prompts, push-to-home).
Imports
use crate::clients::ad_index_client::{AdIndexClient, MockAdIndexClient, ProdAdIndexClient};
use crate::clients::kafka_publisher_client::{KafkaPublisherClient, MockKafkaPublisherClient};
use crate::clients::past_request_timestamps_client::{
MockPastRequestTimestampsClient, PastRequestTimestampsClient, ProdPastRequestTimestampsClient,
};
use crate::clients::prompts_client::{MockPromptsClient, ProdPromptsClient, PromptsClient};
use crate::clients::served_history_client::{
MockServedHistoryClient, ProdServedHistoryClient, ServedHistoryClient,
};
use crate::clients::tweet_entity_service_client::{MockTESClient, ProdTESClient, TESClient};
use crate::clients::who_to_follow_client::{
MockWhoToFollowClient, ProdWhoToFollowClient, WhoToFollowClient,
};
A long block of client imports. Every external service has a triple:
- A trait
XxxClient(the abstraction) - A production impl
ProdXxxClient(real network calls) - A mock impl
MockXxxClient(for tests)
This trait-with-prod-and-mock pattern is repeated for every dependency. It's verbose but makes it trivial to write a mock() constructor — no DI framework, no wiring magic.
use crate::models::query::ScoredPostsQuery;
use crate::params;
use crate::query_hydrators::past_request_timestamps_query_hydrator::PastRequestTimestampsQueryHydrator;
use crate::query_hydrators::served_history_query_hydrator::ServedHistoryQueryHydrator;
use crate::scored_posts_server::ScoredPostsServer;
use crate::selectors::BlenderSelector;
use crate::side_effects::ads_injection_logging_side_effect::AdsInjectionLoggingSideEffect;
use crate::side_effects::client_events_kafka_side_effect::ClientEventsKafkaSideEffect;
use crate::side_effects::for_you_response_stats_side_effect::ForYouResponseStatsSideEffect;
use crate::side_effects::publish_seen_ids_to_kafka_side_effect::PublishSeenIdsToKafkaSideEffect;
use crate::side_effects::served_candidates_kafka_side_effect::ServedCandidatesKafkaSideEffect;
use crate::side_effects::truncate_served_history_side_effect::TruncateServedHistorySideEffect;
use crate::side_effects::update_past_request_timestamps_side_effect::UpdatePastRequestTimestampsSideEffect;
use crate::side_effects::update_served_history_side_effect::UpdateServedHistorySideEffect;
use crate::sources::ads_source::AdsSource;
use crate::sources::prompts_source::PromptsSource;
use crate::sources::push_to_home_source::PushToHomeSource;
use crate::sources::scored_posts_source::ScoredPostsSource;
use crate::sources::who_to_follow_source::WhoToFollowSource;
The For You pipeline's stage imports. Notice:
- 2 query hydrators (served history + past timestamps)
- 5 sources (ScoredPosts is the inner pipeline; the other four are non-post or specialty sources)
- 1 selector (BlenderSelector)
- 8 side-effects
No hydrators, no filters, no scorers, no post-selection. The For You pipeline does almost no per-candidate work — that's the inner Phoenix pipeline's job. For You is mostly about blending the scored posts with other module types and logging the final timeline.
use std::sync::Arc;
use tonic::async_trait;
use xai_candidate_pipeline::candidate_pipeline::CandidatePipeline;
use xai_candidate_pipeline::component_library::clients::{
MockReplyMixerClient, ProdReplyMixerClient, ReplyMixerClient,
};
use xai_candidate_pipeline::filter::Filter;
use xai_candidate_pipeline::hydrator::Hydrator;
use xai_candidate_pipeline::query_hydrator::QueryHydrator;
use xai_candidate_pipeline::scorer::Scorer;
use xai_candidate_pipeline::selector::Selector;
use xai_candidate_pipeline::side_effect::SideEffect;
use xai_candidate_pipeline::source::Source;
use xai_home_mixer_proto::FeedItem;
All the trait imports + FeedItem. Note FeedItem is the candidate type for the For You pipeline — not PostCandidate. A FeedItem is the wire format that can hold a post, a prompt, a who-to-follow card, an ad, etc. The For You pipeline operates on this union type because it blends heterogeneous content.
Struct
pub struct ForYouCandidatePipeline {
query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>>,
sources: Vec<Box<dyn Source<ScoredPostsQuery, FeedItem>>>,
selector: BlenderSelector,
side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, FeedItem>>>>,
}
Four fields — only the stages that are actually used. The omitted stages (hydrators, filters, scorers, etc.) get default empty slices from the trait impl below. Concrete BlenderSelector rather than Box<dyn Selector<…>> because there's only one selector — no need for the indirection.
new — production constructor
impl ForYouCandidatePipeline {
pub async fn new(scored_posts_server: Arc<ScoredPostsServer>, datacenter: &str) -> Self {
let (
ad_index_client,
ads_injection_logging,
publish_seen_ids,
served_candidates,
client_events,
served_history_client,
who_to_follow_client,
prompts_client,
past_request_timestamps_client,
tes_client,
reply_mixer_client,
) = tokio::join!(
async {
Arc::new(
ProdAdIndexClient::new(datacenter)
.await
.expect("Failed to create AdIndex client"),
) as Arc<dyn AdIndexClient + Send + Sync>
},
AdsInjectionLoggingSideEffect::prod(),
PublishSeenIdsToKafkaSideEffect::prod(),
ServedCandidatesKafkaSideEffect::prod(),
ClientEventsKafkaSideEffect::prod(),
…
The takeaway: all client construction happens in parallel via tokio::join!. Building these clients (which involves resolving service discovery, opening TCP/TLS connections, fetching certs) is I/O-bound. tokio::join! lets all of them happen concurrently.
This matters for cold-start time: serial construction would take seconds (each client adds latency). Parallel construction completes when the slowest one finishes — typically <500ms.
tokio::join! is the macro form. It returns a tuple of all the results, in the same order. Each async { ... } block is a separate future.
The as Arc<dyn AdIndexClient + Send + Sync> cast: each ProdXxxClient::new() returns a concrete type; we cast it to Arc<dyn Trait> so the rest of the code can treat it polymorphically. Required because vec![box1, box2, …] needs all elements to have the same type.
.expect("Failed to create XYZ client") — panic on construction failure. The build orchestrator (XServiceBuilder from Session 04) treats a build panic as "service can't start," which causes the process to crash and Kubernetes to retry.
Self::build(
scored_posts_server,
ad_index_client,
ads_injection_logging,
…
)
}
After the tokio::join! resolves, hand off to the synchronous build constructor.
build — actual construction
fn build(
scored_posts_server: Arc<ScoredPostsServer>,
ad_index_client: Arc<dyn AdIndexClient + Send + Sync>,
ads_injection_logging: AdsInjectionLoggingSideEffect,
publish_seen_ids: PublishSeenIdsToKafkaSideEffect,
served_candidates: ServedCandidatesKafkaSideEffect,
client_events: ClientEventsKafkaSideEffect,
served_history_client: Arc<dyn ServedHistoryClient>,
who_to_follow_client: Arc<dyn WhoToFollowClient + Send + Sync>,
prompts_client: Arc<dyn PromptsClient + Send + Sync>,
past_request_timestamps_client: Arc<dyn PastRequestTimestampsClient>,
tes_client: Arc<dyn TESClient + Send + Sync>,
reply_mixer_client: Arc<dyn ReplyMixerClient>,
) -> Self {
This takes all the constructed clients and returns the pipeline. The split between new/build and mock is intentional — both new and mock produce the same set of dependencies, then both call build. So the actual pipeline composition lives in one place.
let query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>> = vec![
Box::new(ServedHistoryQueryHydrator::from_client(Arc::clone(
&served_history_client,
))),
Box::new(PastRequestTimestampsQueryHydrator::new(Arc::clone(
&past_request_timestamps_client,
))),
];
Two query hydrators for For You:
ServedHistoryQueryHydrator— fetches the user's recently-served posts from Manhattan (X's KV store). Used by thePreviouslyServedPostsFilterwe saw in Session 05.PastRequestTimestampsQueryHydrator— fetches when this user last made requests, used for polling/freshness decisions.
These run in parallel (the wave-1 query hydrators from Session 01's framework). They don't depend on each other.
let sources: Vec<Box<dyn Source<ScoredPostsQuery, FeedItem>>> = vec![
Box::new(ScoredPostsSource {
scored_posts_server,
}),
Box::new(AdsSource { ad_index_client }),
Box::new(WhoToFollowSource {
who_to_follow_client,
}),
Box::new(PromptsSource { prompts_client }),
Box::new(PushToHomeSource {
tes_client,
reply_mixer_client,
}),
];
Five sources for For You:
ScoredPostsSource— wraps the innerScoredPostsServer(which runs the Phoenix pipeline). This is the workhorse: returns ~50-100 scored posts.AdsSource— queries the ad index for promoted posts.WhoToFollowSource— fetches user recommendations from the WTF service.PromptsSource— fetches in-feed prompts (e.g. "vote in this poll," "verify your account").PushToHomeSource— special-case source for the "click notification → pin its post" flow (usesquery.push_to_home_post_idfrom Session 04).
All five run in parallel and their outputs (each a Vec<FeedItem>) get flattened.
let selector = BlenderSelector::new();
A single selector. BlenderSelector does the item-type-aware blending — alternates between posts and ads, interleaves who-to-follow modules every N items, etc. We'll see its 164-line implementation in Session 10.
let side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, FeedItem>>>> =
Arc::new(vec![
Box::new(ads_injection_logging),
Box::new(publish_seen_ids),
Box::new(served_candidates),
Box::new(client_events),
Box::new(ForYouResponseStatsSideEffect),
Box::new(UpdatePastRequestTimestampsSideEffect::new(
past_request_timestamps_client,
)),
Box::new(UpdateServedHistorySideEffect::new(Arc::clone(
&served_history_client,
))),
Box::new(TruncateServedHistorySideEffect::new(served_history_client)),
]);
Eight side effects for For You, fired asynchronously after the response is returned (per Session 01's run_side_effects behavior). Categories:
- Ads logging (
ads_injection_logging): records which ads were considered + chosen. - Kafka publishers (
publish_seen_ids,served_candidates,client_events): publish to Kafka for downstream training-data and analytics pipelines. - Stats (
ForYouResponseStatsSideEffect): per-response metrics emit. - State updates (
UpdatePastRequestTimestampsSideEffect,UpdateServedHistorySideEffect): write to Manhattan for the next request to read in its query hydrators. - Maintenance (
TruncateServedHistorySideEffect): trim the served-history index to bounded size.
The state-update side effects share clients with the query hydrators above — note Arc::clone(&served_history_client) and Arc::clone(&past_request_timestamps_client). Read at request start, write after response. The same client handles both.
Self {
query_hydrators,
sources,
selector,
side_effects,
}
}
Pack into the struct.
mock — the test constructor
pub async fn mock(scored_posts_server: Arc<ScoredPostsServer>) -> Self {
let ad_index_client: Arc<dyn AdIndexClient + Send + Sync> = Arc::new(MockAdIndexClient);
let mock_kafka = Arc::new(MockKafkaPublisherClient) as Arc<dyn KafkaPublisherClient>;
let ads_injection = AdsInjectionLoggingSideEffect::new(Arc::clone(&mock_kafka));
let publish_seen_ids = PublishSeenIdsToKafkaSideEffect::new(Arc::clone(&mock_kafka));
let served_candidates = ServedCandidatesKafkaSideEffect::new(Arc::clone(&mock_kafka));
let client_events = ClientEventsKafkaSideEffect::new(mock_kafka);
let served_history_client: Arc<dyn ServedHistoryClient> = Arc::new(MockServedHistoryClient);
let who_to_follow_client: Arc<dyn WhoToFollowClient + Send + Sync> =
Arc::new(MockWhoToFollowClient);
let prompts_client: Arc<dyn PromptsClient + Send + Sync> = Arc::new(MockPromptsClient);
let past_request_timestamps_client: Arc<dyn PastRequestTimestampsClient> =
Arc::new(MockPastRequestTimestampsClient);
let tes_client: Arc<dyn TESClient + Send + Sync> = Arc::new(MockTESClient::default());
let reply_mixer_client: Arc<dyn ReplyMixerClient> = Arc::new(MockReplyMixerClient);
Self::build(
scored_posts_server,
ad_index_client,
ads_injection,
publish_seen_ids,
served_candidates,
client_events,
served_history_client,
who_to_follow_client,
prompts_client,
past_request_timestamps_client,
tes_client,
reply_mixer_client,
)
}
}
Constructs all-mock clients and calls build. Each mock is a unit struct that implements the corresponding trait with no-op or constant-returning methods. Note mock_kafka is shared across four side effects (all of them publish to Kafka, so they all need a publisher).
The Arc::clone(&mock_kafka) for the first three; the fourth consumes the original mock_kafka (no clone needed since nothing else uses it after).
The CandidatePipeline trait impl
#[async_trait]
impl CandidatePipeline<ScoredPostsQuery, FeedItem> for ForYouCandidatePipeline {
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<ScoredPostsQuery>>] {
&self.query_hydrators
}
fn sources(&self) -> &[Box<dyn Source<ScoredPostsQuery, FeedItem>>] {
&self.sources
}
fn hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, FeedItem>>] {
&[]
}
fn filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, FeedItem>>] {
&[]
}
fn scorers(&self) -> &[Box<dyn Scorer<ScoredPostsQuery, FeedItem>>] {
&[]
}
fn selector(&self) -> &dyn Selector<ScoredPostsQuery, FeedItem> {
&self.selector
}
fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, FeedItem>>] {
&[]
}
fn post_selection_filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, FeedItem>>] {
&[]
}
fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, FeedItem>>>> {
Arc::clone(&self.side_effects)
}
fn result_size(&self) -> usize {
params::FOR_YOU_MAX_RESULT_SIZE
}
}
Implement the trait. Most methods return references to the four stored fields. The five empty-slice returns (hydrators, filters, scorers, post_selection_hydrators, post_selection_filters) are explicit no-ops.
So the For You pipeline executes:
query_hydrators (||):
ServedHistoryQueryHydrator
PastRequestTimestampsQueryHydrator
↓
sources (||):
ScoredPostsSource (← runs inner Phoenix pipeline)
AdsSource
WhoToFollowSource
PromptsSource
PushToHomeSource
↓
(no hydrators / no filters / no scorers — candidates pass through)
↓
selector: BlenderSelector (sorts + blends + truncates)
↓
(no post-selection hydration / no post-selection filtering)
↓
truncate to FOR_YOU_MAX_RESULT_SIZE
↓
return PipelineResult, then in background:
AdsInjectionLoggingSideEffect
PublishSeenIdsToKafkaSideEffect
ServedCandidatesKafkaSideEffect
ClientEventsKafkaSideEffect
ForYouResponseStatsSideEffect
UpdatePastRequestTimestampsSideEffect
UpdateServedHistorySideEffect
TruncateServedHistorySideEffect
That's the entire For You orchestration. The complexity is in (a) the inner Phoenix pipeline inside ScoredPostsSource, and (b) the BlenderSelector that blends post + non-post sources.
phoenix_candidate_pipeline.rs (772 lines) — the inner pipeline
The big one. Defines all the post-scoring components in their fired order.
Import block (lines 1-142)
The imports list every component this pipeline uses. We can read this as the menu of features in the For You algorithm:
use crate::candidate_hydrators::ads_brand_safety_hydrator::AdsBrandSafetyHydrator;
use crate::candidate_hydrators::ads_brand_safety_vf_hydrator::AdsBrandSafetyVfHydrator;
use crate::candidate_hydrators::blocked_by_hydrator::BlockedByHydrator;
use crate::candidate_hydrators::core_data_candidate_hydrator::CoreDataCandidateHydrator;
use crate::candidate_hydrators::filtered_topics_hydrator::FilteredTopicsHydrator;
use crate::candidate_hydrators::following_replied_users_hydrator::FollowingRepliedUsersHydrator;
use crate::candidate_hydrators::gizmoduck_hydrator::GizmoduckCandidateHydrator;
use crate::candidate_hydrators::has_media_hydrator::HasMediaHydrator;
use crate::candidate_hydrators::in_network_candidate_hydrator::InNetworkCandidateHydrator;
use crate::candidate_hydrators::language_code_hydrator::LanguageCodeHydrator;
use crate::candidate_hydrators::mutual_follow_jaccard_hydrator::MutualFollowJaccardHydrator;
use crate::candidate_hydrators::quote_hydrator::QuoteHydrator;
use crate::candidate_hydrators::subscription_hydrator::SubscriptionHydrator;
use crate::candidate_hydrators::tweet_type_metrics_hydrator::TweetTypeMetricsHydrator;
use crate::candidate_hydrators::vf_candidate_hydrator::VFCandidateHydrator;
use crate::candidate_hydrators::video_duration_candidate_hydrator::VideoDurationCandidateHydrator;
16 candidate hydrators. These are the per-post enrichment stages. We'll read them in Sessions 07-08.
use crate::clients::followed_grok_topics_store_client::{
FollowedGrokTopicsStoreClient, MockFollowedGrokTopicsStoreClient,
ProdFollowedGrokTopicsStoreClient,
};
use crate::clients::followed_starter_packs_store_client::{
FollowedStarterPacksStoreClient, MockFollowedStarterPacksStoreClient,
ProdFollowedStarterPacksStoreClient,
};
use crate::clients::gender_prediction_client::{
GenderPredictionGrpcClient, MockGenderPredictionGrpcClient, ProdGenderPredictionGrpcClient,
};
use crate::clients::gizmoduck_client::{GizmoduckClient, MockGizmoduckClient, ProdGizmoduckClient};
use crate::clients::impressed_posts_client::ImpressedPostsClient;
use crate::clients::kafka_publisher_client::{
KafkaCluster, KafkaPublisherClient, MockKafkaPublisherClient, PHOENIX_SCORES_TOPIC,
ProdKafkaPublisherClient, RERANKING_TOPIC,
};
use crate::clients::s2s::{S2S_CHAIN_PATH, S2S_CRT_PATH, S2S_KEY_PATH};
use crate::clients::tweet_entity_service_client::{MockTESClient, ProdTESClient, TESClient};
use crate::clients::user_action_aggregation_client::{
MockUserActionAggregationClient, ProdUserActionAggregationClient, UserActionAggregationClient,
};
use crate::clients::user_demographics_client::{
MockUserDemographicsClient, ProdUserDemographicsClient, UserDemographicsClient,
};
use crate::clients::user_inferred_gender_store_client::{
MockUserInferredGenderStoreClient, ProdUserInferredGenderStoreClient,
UserInferredGenderStoreClient,
};
use crate::clients::vm_ranker_client::{MockVMRankerClient, ProdVMRankerClient, VMRankerClient};
A dozen client trait+prod+mock triples for the various services Phoenix talks to:
UserActionAggregationClient— fetches the user's recent engagement sequence (the input to Phoenix model)PhoenixPredictionClient— the ML inference endpointPhoenixRetrievalClient— the OON retrieval endpoint (two-tower model)ThunderClient— what we read in Sessions 02-03StratoClient— generic key-value lookupTweetMixerClient— legacy retrieval serviceTESClient— Tweet Entity Service (post hydration)GizmoduckClient— user-data serviceVisibilityFilteringClient— safety/visibilityRedisClient— generic RedisVMRankerClient— a separate ranking service (likely "Video Mixer" or similar)SafetyLabelStoreClient— fetches safety labelsImpressionBloomFilterClient— fetches the user's impression bloom filtersGeoIpLocationClient— IP → location lookupUserDemographicsClient— age/gender featuresUserInferredGenderStoreClient+GenderPredictionGrpcClient— gender prediction (cached + live)ImpressedPostsClient— fetches recently-impressed posts (the smaller, recent list)FollowedGrokTopicsStoreClient/FollowedStarterPacksStoreClient— topic / starter pack subscriptions
That's the complete dependency surface of the Phoenix pipeline. ~20 services.
use crate::filters::age_filter::AgeFilter;
use crate::filters::ancillary_vf_filter::AncillaryVFFilter;
use crate::filters::author_socialgraph_filter::AuthorSocialgraphFilter;
use crate::filters::core_data_hydration_filter::CoreDataHydrationFilter;
use crate::filters::dedup_conversation_filter::DedupConversationFilter;
use crate::filters::drop_duplicates_filter::DropDuplicatesFilter;
use crate::filters::ineligible_subscription_filter::IneligibleSubscriptionFilter;
use crate::filters::muted_keyword_filter::MutedKeywordFilter;
use crate::filters::new_user_topic_ids_filter::NewUserTopicIdsFilter;
use crate::filters::previously_seen_posts_backup_filter::PreviouslySeenPostsBackupFilter;
use crate::filters::previously_seen_posts_filter::PreviouslySeenPostsFilter;
use crate::filters::previously_served_posts_filter::PreviouslyServedPostsFilter;
use crate::filters::retweet_deduplication_filter::RetweetDeduplicationFilter;
use crate::filters::self_tweet_filter::SelfTweetFilter;
use crate::filters::topic_ids_filter::TopicIdsFilter;
use crate::filters::vf_filter::VFFilter;
use crate::filters::video_filter::VideoFilter;
All 17 filters from Session 05. Note previously_seen_posts_backup_filter's ImpressedPostsQueryHydrator isn't wired in the production list below — it's constructed but assigned to _impressed_posts_hydrator (an unused underscore-prefix variable). This is a feature dark-launched but not active.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::params;
The query and candidate types (from Session 04's models/).
use crate::query_hydrators::blocked_user_ids_query_hydrator::BlockedUserIdsQueryHydrator;
use crate::query_hydrators::cached_posts_query_hydrator::CachedPostsQueryHydrator;
use crate::query_hydrators::followed_grok_topics_query_hydrator::FollowedGrokTopicsQueryHydrator;
use crate::query_hydrators::followed_starter_packs_query_hydrator::FollowedStarterPacksQueryHydrator;
use crate::query_hydrators::followed_user_ids_query_hydrator::FollowedUserIdsQueryHydrator;
use crate::query_hydrators::ip_query_hydrator::IpQueryHydrator;
use crate::query_hydrators::impressed_posts_query_hydrator::ImpressedPostsQueryHydrator;
use crate::query_hydrators::impression_bloom_filter_query_hydrator::ImpressionBloomFilterQueryHydrator;
use crate::query_hydrators::inferred_grok_topics_query_hydrator::InferredGrokTopicsQueryHydrator;
use crate::query_hydrators::muted_user_ids_query_hydrator::MutedUserIdsQueryHydrator;
use crate::query_hydrators::mutual_follow_query_hydrator::MutualFollowQueryHydrator;
use crate::query_hydrators::retrieval_sequence_query_hydrator::RetrievalSequenceQueryHydrator;
use crate::query_hydrators::scoring_sequence_query_hydrator::ScoringSequenceQueryHydrator;
use crate::query_hydrators::subscribed_user_ids_query_hydrator::SubscribedUserIdsQueryHydrator;
use crate::query_hydrators::user_demographics_query_hydrator::UserDemographicsQueryHydrator;
use crate::query_hydrators::user_inferred_gender_query_hydrator::UserInferredGenderQueryHydrator;
15 query hydrators we'll see in Session 09.
use crate::scorers::phoenix_scorer::PhoenixScorer;
use crate::scorers::ranking_scorer::RankingScorer;
use crate::scorers::vm_ranker::VMRanker;
use crate::selectors::TopKScoreSelector;
use crate::side_effects::mutual_follow_stats_side_effect::MutualFollowStatsSideEffect;
use crate::side_effects::phoenix_experiments_side_effect::PhoenixExperimentsSideEffect;
use crate::side_effects::phoenix_request_cache_side_effect::PhoenixRequestCacheSideEffect;
use crate::side_effects::redis_post_candidate_cache_side_effect::RedisPostCandidateCacheSideEffect;
use crate::side_effects::reranking_kafka_side_effect::RerankingKafkaSideEffect;
use crate::side_effects::scored_stats_side_effect::ScoredStatsSideEffect;
use crate::sources::cached_posts_source::CachedPostsSource;
use crate::sources::phoenix_moe_source::PhoenixMOESource;
use crate::sources::phoenix_source::PhoenixSource;
use crate::sources::phoenix_topics_source::PhoenixTopicsSource;
use crate::sources::thunder_source::ThunderSource;
use crate::sources::tweet_mixer_source::TweetMixerSource;
The remaining components: 3 scorers, 1 selector, 6 side effects, 6 sources. Plus the framework imports.
Struct
pub struct PhoenixCandidatePipeline {
query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>>,
sources: Vec<Box<dyn Source<ScoredPostsQuery, PostCandidate>>>,
hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>>,
filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>>,
scorers: Vec<Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>>,
selector: TopKScoreSelector,
post_selection_hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>>,
post_selection_filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>>,
side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, PostCandidate>>>>,
}
All nine stage fields are populated (vs. For You's four). This is the real pipeline.
build_with_clients — the wiring (lines 156-351)
The longest function in the file. Builds every stage in order.
pub(crate) async fn build_with_clients(
user_action_aggregation_client: Arc<dyn UserActionAggregationClient + Send + Sync>,
phoenix_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
egress_client: Arc<dyn PhoenixPredictionClient + Send + Sync>,
phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
thunder_client: Arc<ThunderClient>,
strato_client: Arc<dyn StratoClient + Send + Sync>,
tweet_mixer_client: Arc<dyn TweetMixerClient>,
tes_client: Arc<dyn TESClient + Send + Sync>,
gizmoduck_client: Arc<dyn GizmoduckClient + Send + Sync>,
vf_client: Arc<dyn VisibilityFilteringClient + Send + Sync>,
redis_client: Arc<dyn RedisClient + Send + Sync>,
phoenix_kafka_client: Arc<dyn KafkaPublisherClient>,
reranking_kafka_client: Arc<dyn KafkaPublisherClient>,
socialgraph_client: Arc<dyn SocialGraphClientOps>,
vm_ranker_client: Arc<dyn VMRankerClient>,
safety_label_client: Arc<dyn xai_safety_label_store::SafetyLabelStoreClient>,
vf_safety_labels_client: Arc<dyn VfClient>,
phoenix_request_cache_redis_atla_client: Arc<dyn RedisClient + Send + Sync>,
phoenix_request_cache_redis_pdxa_client: Arc<dyn RedisClient + Send + Sync>,
impression_bloom_filter_client: Arc<dyn ImpressionBloomFilterClient>,
ip_client: Arc<GeoIpLocationClient>,
user_demographics_client: Arc<dyn UserDemographicsClient>,
user_inferred_gender_store_client: Arc<dyn UserInferredGenderStoreClient>,
user_inferred_gender_grpc_client: Arc<dyn GenderPredictionGrpcClient>,
impressed_posts_client: Arc<dyn ImpressedPostsClient>,
followed_grok_topics_client: Arc<dyn FollowedGrokTopicsStoreClient>,
followed_starter_packs_client: Arc<dyn FollowedStarterPacksStoreClient>,
) -> PhoenixCandidatePipeline {
27 client parameters. Every dependency must be wired in. pub(crate) because only the in-crate prod and mock constructors should call it — external callers go through those higher-level entry points.
Notable: phoenix_client and egress_client are both PhoenixPredictionClient types. The egress client routes through an egress sidecar (probably a separate datacenter / cell), so the pipeline can call two prediction services in parallel for experiments. We'll see how when we read the PhoenixScorer (Session 10).
Query hydrators (lines 185-232)
let query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>> = vec![
Box::new(ScoringSequenceQueryHydrator::new(
user_action_aggregation_client.clone(),
)),
Box::new(RetrievalSequenceQueryHydrator::new(
user_action_aggregation_client,
)),
Box::new(BlockedUserIdsQueryHydrator {
socialgraph_client: socialgraph_client.clone(),
}),
Box::new(MutedUserIdsQueryHydrator {
socialgraph_client: socialgraph_client.clone(),
}),
Box::new(FollowedUserIdsQueryHydrator {
socialgraph_client: socialgraph_client.clone(),
}),
Box::new(SubscribedUserIdsQueryHydrator {
socialgraph_client: socialgraph_client.clone(),
}),
Box::new(CachedPostsQueryHydrator {
redis_client: redis_client.clone(),
}),
Box::new(MutualFollowQueryHydrator {
strato_client: strato_client.clone(),
}),
Box::new(UserDemographicsQueryHydrator {
client: user_demographics_client,
}),
Box::new(FollowedGrokTopicsQueryHydrator::new(
followed_grok_topics_client,
)),
Box::new(FollowedStarterPacksQueryHydrator::new(
followed_starter_packs_client,
)),
Box::new(InferredGrokTopicsQueryHydrator {
strato_client: strato_client.clone(),
}),
Box::new(ImpressionBloomFilterQueryHydrator {
client: impression_bloom_filter_client,
}),
Box::new(IpQueryHydrator {
client: ip_client,
}),
Box::new(UserInferredGenderQueryHydrator::new(
user_inferred_gender_store_client,
user_inferred_gender_grpc_client,
)),
];
15 query hydrators, executed in parallel by the framework. Each populates a slice of ScoredPostsQuery:
| Hydrator | Populates |
|---|---|
| ScoringSequenceQueryHydrator | scoring_sequence (engagement history for ranking) |
| RetrievalSequenceQueryHydrator | retrieval_sequence (history for retrieval) |
| BlockedUserIdsQueryHydrator | user_features.blocked_user_ids |
| MutedUserIdsQueryHydrator | user_features.muted_user_ids |
| FollowedUserIdsQueryHydrator | user_features.followed_user_ids |
| SubscribedUserIdsQueryHydrator | user_features.subscribed_user_ids |
| CachedPostsQueryHydrator | cached_posts, has_cached_posts |
| MutualFollowQueryHydrator | viewer_minhash |
| UserDemographicsQueryHydrator | user_demographics |
| FollowedGrokTopicsQueryHydrator | followed_grok_topics (32 bools) |
| FollowedStarterPacksQueryHydrator | followed_starter_packs (20 bools) |
| InferredGrokTopicsQueryHydrator | inferred_grok_topics (32 bools) |
| ImpressionBloomFilterQueryHydrator | bloom_filter_entries |
| IpQueryHydrator | ip_location |
| UserInferredGenderQueryHydrator | user_inferred_gender, user_inferred_gender_score |
Look at the Arc::clone(&socialgraph_client) repetition: 4 hydrators share the same client. Each clone bumps the refcount (cheap). The clones go into the boxed hydrators; the original socialgraph_client keeps its own refcount.
let _impressed_posts_hydrator = ImpressedPostsQueryHydrator {
client: impressed_posts_client,
};
Dark-launched but unused. The _impressed_posts_hydrator is constructed (so any construction errors surface) but never added to the query_hydrators vec. This is how a new feature is rolled out: build it first, ship the code, then wire it in via feature flag without redeploying.
Sources (lines 238-257)
let phoenix_source = Box::new(PhoenixSource {
phoenix_retrieval_client: phoenix_retrieval_client.clone(),
});
let phoenix_topics_source = Box::new(PhoenixTopicsSource {
phoenix_retrieval_client: phoenix_retrieval_client.clone(),
});
let phoenix_moe_source = Box::new(PhoenixMOESource {
phoenix_retrieval_client,
});
let thunder_source = Box::new(ThunderSource { thunder_client });
let tweet_mixer_source = Box::new(TweetMixerSource { tweet_mixer_client });
let cached_posts_source = Box::new(CachedPostsSource);
let sources: Vec<Box<dyn Source<ScoredPostsQuery, PostCandidate>>> = vec![
thunder_source,
tweet_mixer_source,
phoenix_source,
phoenix_topics_source,
phoenix_moe_source,
cached_posts_source,
];
6 sources, all parallel:
- ThunderSource — in-network posts via Thunder (we read in Sessions 02-03).
- TweetMixerSource — legacy OON retrieval service.
- PhoenixSource — primary OON retrieval (two-tower model).
- PhoenixTopicsSource — topic-aware OON retrieval.
- PhoenixMOESource — Phoenix mixture-of-experts retrieval.
- CachedPostsSource — re-uses scored posts from the cache hydrator.
Three of the six use the same phoenix_retrieval_client (the last one consumes the Arc — note no .clone()). This is the three-flavor OON retrieval approach: same client, three query strategies, three sets of candidates.
Hydrators (lines 259-272)
let hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>> = vec![
Box::new(InNetworkCandidateHydrator),
Box::new(CoreDataCandidateHydrator::new(tes_client.clone()).await),
Box::new(QuoteHydrator::new(tes_client.clone(), socialgraph_client.clone()).await),
Box::new(VideoDurationCandidateHydrator::new(tes_client.clone()).await),
Box::new(HasMediaHydrator::new(tes_client.clone()).await),
Box::new(SubscriptionHydrator::new(tes_client.clone()).await),
Box::new(GizmoduckCandidateHydrator::new(gizmoduck_client).await),
Box::new(BlockedByHydrator::new(socialgraph_client).await),
Box::new(FilteredTopicsHydrator {
strato_client: strato_client.clone(),
}),
Box::new(LanguageCodeHydrator::new(tes_client.clone()).await),
];
10 pre-scoring hydrators running in parallel. Each enriches the candidate with one slice of data. We'll read each in Sessions 07-08.
Note 5 of them use TES (Tweet Entity Service). Each is a separate hydrator because they fetch different aspects (core data, video duration, has-media, subscription gating, language code). Despite using the same client, the hydrators are separate to keep each cacheable independently — different cache TTLs, different per-candidate-key partitioning.
socialgraph_client is consumed by BlockedByHydrator (final move, no clone).
Filters (lines 274-289)
let filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>> = vec![
Box::new(DropDuplicatesFilter),
Box::new(CoreDataHydrationFilter),
Box::new(AgeFilter::new(Duration::from_secs(params::MAX_POST_AGE))),
Box::new(SelfTweetFilter),
Box::new(RetweetDeduplicationFilter),
Box::new(IneligibleSubscriptionFilter),
Box::new(PreviouslySeenPostsFilter),
Box::new(PreviouslySeenPostsBackupFilter),
Box::new(PreviouslyServedPostsFilter),
Box::new(MutedKeywordFilter::new()),
Box::new(AuthorSocialgraphFilter),
Box::new(VideoFilter),
Box::new(TopicIdsFilter),
Box::new(NewUserTopicIdsFilter),
];
14 filters in sequence — the exact list we read in Session 05. Note the order is significant:
DropDuplicatesFilterfirst — dedup by tweet_id, no other state needed.CoreDataHydrationFilter— drop hydration failures. After this, every candidate has author_id != 0.AgeFilter— drop old posts. Trivial.SelfTweetFilter— viewer's own posts.RetweetDeduplicationFilter— content-dedup of retweet vs. original.IneligibleSubscriptionFilter— paywalled. 7-9.PreviouslySeenPostsFilter+PreviouslySeenPostsBackupFilter+PreviouslyServedPostsFilter— three flavors of seen-content dedup.MutedKeywordFilter— expensive tokenization, runs after cheaper filters have reduced the count.AuthorSocialgraphFilter— block/mute graph.VideoFilter— video exclusion if requested.TopicIdsFilter— topic-based filtering (the giant from Session 05).NewUserTopicIdsFilter— onboarding topic filter.
Order is performance-driven: cheap filters before expensive ones, dedup before substantive filtering. By the time MutedKeywordFilter (with its tokenizer) runs, the candidate count is already reduced.
Scorers (lines 291-300)
let phoenix_scorer = Box::new(PhoenixScorer {
phoenix_client: phoenix_client.clone(),
egress_client: Arc::clone(&egress_client),
});
let ranking_scorer = Box::new(RankingScorer);
let vm_ranker = Box::new(VMRanker {
client: vm_ranker_client,
});
let scorers: Vec<Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>> =
vec![phoenix_scorer, ranking_scorer, vm_ranker];
3 scorers in sequence:
PhoenixScorer— the Grok-based transformer. Predicts per-action probabilities, stored oncandidate.phoenix_scores.RankingScorer— combinesphoenix_scoresintoweighted_scorethenscore. The "weighted sum + diversity attenuation" stage.VMRanker— a separate ranking service that re-scores via another model. Final adjustment.
Note phoenix_client is cloned but egress_client is also cloned (Arc::clone(&egress_client)). Both are needed because the scorer hits both endpoints.
vm_ranker_client is moved (consumed). Each Arc-clone here is cheap (refcount bump only).
Selector
let selector = TopKScoreSelector;
A unit struct — no fields. We'll see it in Session 10: simple "sort by score, take top K." For Phoenix, simpler than the For You blender.
Post-selection hydrators (lines 304-315)
let post_selection_hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>> = vec![
Box::new(VFCandidateHydrator::new(vf_client.clone()).await),
Box::new(AdsBrandSafetyHydrator::new(safety_label_client)),
Box::new(AdsBrandSafetyVfHydrator {
client: vf_safety_labels_client,
}),
Box::new(TweetTypeMetricsHydrator::new()),
Box::new(FollowingRepliedUsersHydrator),
Box::new(MutualFollowJaccardHydrator {
strato_client: strato_client.clone(),
}),
];
6 expensive hydrators that run only on the top-K candidates after selection:
- VFCandidateHydrator — visibility-filtering service call. Sets
visibility_reason. - AdsBrandSafetyHydrator — fetches brand-safety labels.
- AdsBrandSafetyVfHydrator — additional VF-side brand-safety labels.
- TweetTypeMetricsHydrator — computes the
tweet_type_metricsblob. - FollowingRepliedUsersHydrator — finds out which followed users replied to each post.
- MutualFollowJaccardHydrator — computes the mutual-follow Jaccard similarity.
Post-selection because they're either expensive (VF, brand safety) or only relevant after we know which posts get shown (following-replied).
Post-selection filters (lines 317-321)
let post_selection_filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>> = vec![
Box::new(VFFilter),
Box::new(AncillaryVFFilter),
Box::new(DedupConversationFilter),
];
3 post-selection filters:
VFFilter— drop posts withvisibility_reasonindicating drop.AncillaryVFFilter— drop ancillary candidates (from Session 05'sdrop_ancillary_postsflag).DedupConversationFilter— keep best post per thread (needsscore).
These need post-selection hydration to have run (VF needs visibility_reason populated; conversation dedup needs scores).
Side effects (lines 323-338)
let side_effects: Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, PostCandidate>>>> =
Arc::new(vec![
Box::new(PhoenixExperimentsSideEffect::new(
phoenix_client,
egress_client,
phoenix_kafka_client,
)),
Box::new(RerankingKafkaSideEffect::new(reranking_kafka_client)),
Box::new(RedisPostCandidateCacheSideEffect::new(redis_client)),
Box::new(ScoredStatsSideEffect),
Box::new(MutualFollowStatsSideEffect),
Box::new(PhoenixRequestCacheSideEffect::new(
phoenix_request_cache_redis_atla_client,
phoenix_request_cache_redis_pdxa_client,
)),
]);
6 side effects:
- PhoenixExperimentsSideEffect — runs experimental Phoenix variants in shadow mode (uses both prediction clients).
- RerankingKafkaSideEffect — publishes ranking decisions to Kafka.
- RedisPostCandidateCacheSideEffect — writes the scored candidates to Redis so the next request's
CachedPostsQueryHydratorcan re-use them. - ScoredStatsSideEffect — per-request metrics emit.
- MutualFollowStatsSideEffect — mutual-follow Jaccard distribution metrics.
- PhoenixRequestCacheSideEffect — writes Phoenix prediction results to Redis (two clients: atla + pdxa datacenter caches). Heavy: writes raw model outputs for cache-replay analysis.
Final pack
PhoenixCandidatePipeline {
query_hydrators,
hydrators,
filters,
sources,
scorers,
selector,
post_selection_hydrators,
post_selection_filters,
side_effects,
}
}
Pack into the struct. The fields are reordered slightly from the struct definition — Rust doesn't care.
prod — production constructor (lines 353-659)
The wrapper that builds all 27 production clients in parallel, then calls build_with_clients. We won't walk every tokio::join! arm — the pattern is identical to For You's new:
pub async fn prod(
shard_coordinate: Option<ShardCoordinate>,
datacenter: &str,
) -> PhoenixCandidatePipeline {
let local_cache_eds = String::new();
let atla_phoenix_cache_eds = "";
let pdxa_phoenix_cache_eds = "";
let (
flock_socialgraph_client,
user_action_aggregation_client,
…
) = tokio::join!(
async {
Arc::new(
SocialGraphClient::new(
datacenter,
&S2S_CHAIN_PATH,
&S2S_CRT_PATH,
&S2S_KEY_PATH,
)
.await
.expect("Failed to create flock SocialGraphClient"),
) as Arc<dyn SocialGraphClientOps>
},
…
Some highlights worth pointing out:
async {
Arc::new(
ProdPhoenixRetrievalClient::new(Some((
PhoenixRetrievalCluster::Experiment1Fou,
PhoenixRetrievalCluster::Experiment1Lap7,
)))
.await
.expect("Failed to create Phoenix retrieval client"),
) as Arc<dyn PhoenixRetrievalClient + Send + Sync>
},
The Phoenix retrieval client takes a Some((cluster1, cluster2)) parameter — two clusters with experiment names "Fou" and "Lap7". Likely codenamed experiments. The retrieval client itself probably splits queries between them.
async {
Arc::new(
XdsRedisClient::new(XdsRedisConfig {
eds_resource_name: atla_phoenix_cache_eds.into(),
})
.await
.expect("Failed to create xDS Redis client for atla phoenix cache"),
) as Arc<dyn RedisClient + Send + Sync>
},
async {
Arc::new(
XdsRedisClient::new(XdsRedisConfig {
eds_resource_name: pdxa_phoenix_cache_eds.into(),
})
.await
.expect("Failed to create xDS Redis client for pdxa phoenix cache"),
) as Arc<dyn RedisClient + Send + Sync>
},
Three Redis clients are constructed: local cache, atla (Atlanta) Phoenix cache, pdxa (somewhere) Phoenix cache. Cross-datacenter caching — predictions made in one DC can be replayed against another for backup / experiment comparison.
async {
Arc::new(
ProdVfClient::new(datacenter)
.await
.expect("Failed to create VF SafetyLabels client")
.with_timeout_ms(500)
.with_max_batch_size(150),
) as Arc<dyn VfClient>
},
The VF safety-labels client gets explicit per-request configuration: 500ms timeout, batch size 150. This is the hottest path (safety checks every candidate); tight bounds prevent it from dragging tail latency.
The constants atla_phoenix_cache_eds, pdxa_phoenix_cache_eds, local_cache_eds, and Kafka topic names like PHOENIX_SCORES_TOPIC are all empty strings in the open-source release — redacted. Same pattern as Thunder's Kafka topics.
After the tokio::join!, hand off to build_with_clients. The function returns its result.
mock — test constructor (lines 661-728)
Same pattern but with all mocks. Repeated boilerplate of Arc::new(MockXxxClient) instances. No surprises — covers every dependency.
CandidatePipeline trait impl (lines 731-771)
#[async_trait]
impl CandidatePipeline<ScoredPostsQuery, PostCandidate> for PhoenixCandidatePipeline {
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<ScoredPostsQuery>>] {
&self.query_hydrators
}
fn sources(&self) -> &[Box<dyn Source<ScoredPostsQuery, PostCandidate>>] {
&self.sources
}
fn hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>] {
&self.hydrators
}
fn filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, PostCandidate>>] {
&self.filters
}
fn scorers(&self) -> &[Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>] {
&self.scorers
}
fn selector(&self) -> &dyn Selector<ScoredPostsQuery, PostCandidate> {
&self.selector
}
fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>] {
&self.post_selection_hydrators
}
fn post_selection_filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, PostCandidate>>] {
&self.post_selection_filters
}
fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<ScoredPostsQuery, PostCandidate>>>> {
Arc::clone(&self.side_effects)
}
fn result_size(&self) -> usize {
params::RESULT_SIZE
}
}
Trait impl. Returns references to all stage fields; clones the side-effects Arc.
That's the entire Phoenix pipeline configuration.
The orphan files
Four files exist in home-mixer/candidate_pipeline/ but aren't declared in mod.rs:
query.rs (69 lines)
use crate::candidate_pipeline::query_features::UserFeatures;
use crate::util::request_util::generate_request_id;
use xai_candidate_pipeline::candidate_pipeline::HasRequestId;
use xai_home_mixer_proto::ImpressionBloomFilterEntry;
use xai_twittercontext_proto::{GetTwitterContextViewer, TwitterContextViewer};
#[derive(Clone, Default, Debug)]
pub struct ScoredPostsQuery {
pub user_id: i64,
pub client_app_id: i32,
pub country_code: String,
pub language_code: String,
pub seen_ids: Vec<i64>,
pub served_ids: Vec<i64>,
pub in_network_only: bool,
pub is_bottom_request: bool,
pub bloom_filter_entries: Vec<ImpressionBloomFilterEntry>,
pub user_action_sequence: Option<xai_recsys_proto::UserActionSequence>,
pub user_features: UserFeatures,
pub request_id: String,
}
A simpler ScoredPostsQuery with only 12 fields (vs. the 50-field version in models/query.rs from Session 04). Note user_id: i64 (vs. u64), request_id: String (vs. u64), and Vec<i64> for IDs (vs. Vec<u64>).
This looks like an earlier version of the type — before the refactor that moved it to models/. The current production code uses the models/ version (which is what phoenix_candidate_pipeline.rs imports).
impl ScoredPostsQuery {
pub fn new(
user_id: i64,
…
) -> Self {
let request_id = format!("{}-{}", generate_request_id(), user_id);
…
}
}
impl GetTwitterContextViewer for ScoredPostsQuery { … }
impl HasRequestId for ScoredPostsQuery { … }
The orphan version has its own constructor and the HasRequestId trait impl. The HasRequestId trait is from xai_candidate_pipeline::candidate_pipeline — but the modern models/query.rs doesn't impl it. So either this trait was deprecated, or the modern code uses a different mechanism.
query_features.rs (10 lines)
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct UserFeatures {
pub muted_keywords: Vec<String>,
pub blocked_user_ids: Vec<i64>,
pub muted_user_ids: Vec<i64>,
pub followed_user_ids: Vec<i64>,
pub subscribed_user_ids: Vec<i64>,
}
Mini UserFeatures for the orphan query. No follower_count (vs. the modern models/user_features.rs) and no MValCodec impl (vs. the modern version which has full Thrift codec support).
This is consistent with the "earlier version" theory.
candidate.rs (70 lines)
use std::collections::HashMap;
use xai_home_mixer_proto as pb;
use xai_visibility_filtering::models as vf;
#[derive(Clone, Debug, Default)]
pub struct PostCandidate {
pub tweet_id: i64,
pub author_id: u64,
pub tweet_text: String,
pub in_reply_to_tweet_id: Option<u64>,
pub retweeted_tweet_id: Option<u64>,
pub retweeted_user_id: Option<u64>,
pub phoenix_scores: PhoenixScores,
pub prediction_request_id: Option<u64>,
pub last_scored_at_ms: Option<u64>,
pub weighted_score: Option<f64>,
pub score: Option<f64>,
pub served_type: Option<pb::ServedType>,
pub in_network: Option<bool>,
pub ancestors: Vec<u64>,
pub video_duration_ms: Option<i32>,
pub author_followers_count: Option<i32>,
pub author_screen_name: Option<String>,
pub retweeted_screen_name: Option<String>,
pub visibility_reason: Option<vf::FilteredReason>,
pub subscription_author_id: Option<u64>,
}
A simpler PostCandidate — 20 fields vs. the modern ~30-field version. Missing: brand safety, safety labels, mutual follow, quote tweet fields, engagement counts, has_media, language_code, filtered_topic_ids.
#[derive(Clone, Debug, Default)]
pub struct PhoenixScores {
pub favorite_score: Option<f64>,
pub reply_score: Option<f64>,
pub retweet_score: Option<f64>,
pub photo_expand_score: Option<f64>,
pub click_score: Option<f64>,
pub profile_click_score: Option<f64>,
pub vqv_score: Option<f64>,
pub share_score: Option<f64>,
pub share_via_dm_score: Option<f64>,
pub share_via_copy_link_score: Option<f64>,
pub dwell_score: Option<f64>,
pub quote_score: Option<f64>,
pub quoted_click_score: Option<f64>,
pub follow_author_score: Option<f64>,
pub not_interested_score: Option<f64>,
pub block_author_score: Option<f64>,
pub mute_author_score: Option<f64>,
pub report_score: Option<f64>,
pub dwell_time: Option<f64>,
}
Inline PhoenixScores definition (vs. the modern version which re-exports from xai_candidate_pipeline::component_library::models::PhoenixScores). 19 score fields, including vqv_score (video quality view), share_via_dm_score, share_via_copy_link_score, and the negative actions (not_interested, block_author, mute_author, report). One continuous action: dwell_time (vs. the discrete dwell_score).
This list is the complete set of actions Phoenix predicts. Worth keeping for reference.
pub trait CandidateHelpers {
fn get_screen_names(&self) -> HashMap<u64, String>;
}
impl CandidateHelpers for PostCandidate {
fn get_screen_names(&self) -> HashMap<u64, String> {
let mut screen_names = HashMap::<u64, String>::new();
if let Some(author_screen_name) = self.author_screen_name.clone() {
screen_names.insert(self.author_id, author_screen_name);
}
if let (Some(retweeted_screen_name), Some(retweeted_user_id)) =
(self.retweeted_screen_name.clone(), self.retweeted_user_id)
{
screen_names.insert(retweeted_user_id, retweeted_screen_name);
}
screen_names
}
}
Same CandidateHelpers trait as the modern version, but only one method — get_screen_names. The modern version has 4 methods including as_tweet_info which converts to the Phoenix proto input.
candidate_features.rs (78 lines)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct PureCoreData {
pub author_id: u64,
pub text: String,
pub source_tweet_id: Option<u64>,
pub source_user_id: Option<u64>,
pub in_reply_to_tweet_id: Option<u64>,
pub in_reply_to_user_id: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct ExclusiveTweetControl {
pub conversation_author_id: i64,
}
pub type MediaEntities = Vec<MediaEntity>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct MediaEntity {
pub media_info: Option<MediaInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub enum MediaInfo {
VideoInfo(VideoInfo),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct VideoInfo {
pub duration_millis: i32,
}
…
The orphan version of candidate_features.rs includes inline definitions of PureCoreData, ExclusiveTweetControl, MediaEntity, MediaInfo, VideoInfo, Share, Reply, GizmoduckUserCounts, GizmoduckUserProfile, GizmoduckUser, GizmoduckUserResult.
The modern version (models/candidate_features.rs from Session 04) pub use-imports these from xai_core_entities::entities. So this is the inline form of types that have since been moved out.
Net interpretation: these four files are pre-refactor leftovers. They're not compiled (mod.rs doesn't declare them), but they remain in the source tree — probably because they were excluded from the cleanup PR by accident, or kept as a reference for anyone tracing through git history.
Doesn't affect runtime; worth knowing if anyone tries to make changes here and gets confused.
What we've learned
The whole For-You pipeline, in one place:
ForYouCandidatePipeline (FeedItem)
├── QueryHydrators (2): ServedHistory, PastRequestTimestamps
├── Sources (5): ScoredPosts*, Ads, WhoToFollow, Prompts, PushToHome
├── Selector: BlenderSelector
└── SideEffects (8): kafka publishers + state updates + maintenance
*ScoredPosts wraps:
PhoenixCandidatePipeline (PostCandidate)
├── QueryHydrators (15): scoring/retrieval sequences, social-graph, demographics, IP, gender, bloom filters
├── Sources (6): Thunder, TweetMixer, 3 Phoenix variants, Cached
├── Hydrators (10): in-network, core data, quote, video duration, has media,
│ subscription, Gizmoduck, blocked-by, filtered topics, language
├── Filters (14): dedup, hydration check, age, self, retweet dedup, subscription,
│ 3 seen flavors, muted keywords, social graph, video, topic, new-user topic
├── Scorers (3): Phoenix model, Ranking, VM Ranker
├── Selector: TopKScoreSelector
├── PostSelectionHydrators (6): VF, brand safety (×2), tweet type metrics,
│ following replied, mutual follow Jaccard
├── PostSelectionFilters (3): VF, Ancillary VF, Conversation dedup
└── SideEffects (6): Phoenix experiments, Reranking kafka, Redis cache,
scored stats, mutual follow stats, Phoenix request cache
Dependency injection without a framework: every component takes its dependencies as Arcprod and mock constructors produce dependencies, then call build_with_clients which composes them. No magic.
Cold-start optimization: tokio::join! for all client construction. Saves seconds at startup.
Sequential vs parallel execution (recall from Session 01):
- Query hydrators: parallel.
- Sources: parallel.
- Hydrators: parallel.
- Filters: sequential.
- Scorers: sequential.
- Side-effects: parallel, fire-and-forget.
The pipeline doc here just lists the components; the framework code decides parallelism.
Filter order matters: cheap → expensive, dedup first, expensive filters (muted keyword tokenization, topic taxonomy) after.
Two-pass hydration: 10 hydrators run on all candidates pre-filter; 6 more expensive ones run only on the top-K post-selection. Saves work on candidates that get filtered out.
Three-flavor OON retrieval: Phoenix, PhoenixTopics, PhoenixMOE — three different retrieval strategies feeding the same downstream pipeline. Different queries to the same retrieval client.
Dark-launched code: ImpressedPostsQueryHydrator is constructed but never wired — built first, activated later via feature flags.
Mock-everywhere: every client has a Mock* variant. The mock() constructor on each pipeline returns a fully-functional (but non-network-touching) pipeline for tests.
Orphan files: candidate.rs, candidate_features.rs, query.rs, query_features.rs are dead code in this directory — not declared in mod.rs. Probably pre-refactor leftovers. Don't touch.
Next session
Session 07 — Candidate hydrators, part 1. We start reading the 16 hydrator implementations the Phoenix pipeline composes. Part 1 covers the simpler / structural ones (~900 LOC):
in_network_candidate_hydrator.rscore_data_candidate_hydrator.rssubscription_hydrator.rsgizmoduck_hydrator.rsblocked_by_hydrator.rshas_media_hydrator.rslanguage_code_hydrator.rsvideo_duration_candidate_hydrator.rs
Session 08 covers the rest.