X For You algorithm, line by line · Part 9

X For You algorithm, line by line — Part 9: Query hydrators

Part 9 of the deep dive into xai-org/x-algorithm. The 16 query hydrators that populate ScoredPostsQuery: 4 social-graph fetchers, cached-posts Redis lookup, MinHash signature, two parallel UAS-aggregation calls, served history with fatigue logic, IP geolocation, demographics, tiered gender prediction, Grok-topics bitmaps. Plus the orphan pre-refactor files.

May 15, 2026·18 min read

We've spent the last two sessions on candidate hydrators — per-post enrichment. Now: query hydrators — per-request enrichment. These run on the ScoredPostsQuery itself, fetching the viewer's social graph, engagement history, demographics, etc.

From Session 06's pipeline configuration, we know 15 of these run in wave 1 (in parallel). Each fills in a slice of the ScoredPostsQuery mega-struct from Session 04. Files covered (1,235 LOC across 21 files, including 2 orphans):

home-mixer/query_hydrators/
├── mod.rs                                       (18)
├── blocked_user_ids_query_hydrator.rs           (33)
├── muted_user_ids_query_hydrator.rs             (33)
├── followed_user_ids_query_hydrator.rs          (33)
├── subscribed_user_ids_query_hydrator.rs        (33)
├── cached_posts_query_hydrator.rs               (82)
├── mutual_follow_query_hydrator.rs              (37)
├── scoring_sequence_query_hydrator.rs           (68)
├── retrieval_sequence_query_hydrator.rs         (71)
├── served_history_query_hydrator.rs             (109)
├── impression_bloom_filter_query_hydrator.rs    (50)
├── impressed_posts_query_hydrator.rs            (25)
├── past_request_timestamps_query_hydrator.rs    (36)
├── user_demographics_query_hydrator.rs          (30)
├── user_inferred_gender_query_hydrator.rs       (58)
├── followed_grok_topics_query_hydrator.rs       (75)
├── followed_starter_packs_query_hydrator.rs    (44)
├── inferred_grok_topics_query_hydrator.rs       (39)
└── ip_query_hydrator.rs                         (30)

# Orphan files (not in mod.rs, not compiled):
├── user_action_seq_query_hydrator.rs            (188)
└── user_features_query_hydrator.rs              (41)

The active hydrators follow a near-identical pattern: take one client, call one method, populate one field. We'll establish the pattern and then move quickly through the rest.


mod.rs (18 lines)

pub mod blocked_user_ids_query_hydrator;
pub mod cached_posts_query_hydrator;
pub mod followed_grok_topics_query_hydrator;
pub mod followed_starter_packs_query_hydrator;
pub mod followed_user_ids_query_hydrator;
pub mod ip_query_hydrator;
pub mod impressed_posts_query_hydrator;
pub mod impression_bloom_filter_query_hydrator;
pub mod inferred_grok_topics_query_hydrator;
pub mod muted_user_ids_query_hydrator;
pub mod mutual_follow_query_hydrator;
pub mod past_request_timestamps_query_hydrator;
pub mod retrieval_sequence_query_hydrator;
pub mod scoring_sequence_query_hydrator;
pub mod served_history_query_hydrator;
pub mod subscribed_user_ids_query_hydrator;
pub mod user_demographics_query_hydrator;
pub mod user_inferred_gender_query_hydrator;

18 declared modules. Two files in the directory aren't here (user_action_seq_query_hydrator.rs, user_features_query_hydrator.rs) — those are orphans from a pre-refactor era, same as in Session 06's candidate_pipeline directory.


The four "social graph" hydrators (133 lines total)

BlockedUserIdsQueryHydrator, MutedUserIdsQueryHydrator, FollowedUserIdsQueryHydrator, SubscribedUserIdsQueryHydrator are nearly identical 33-line files. They all:

  1. Hold an Arc<dyn SocialGraphClientOps>.
  2. Call client.get_<role>_user_ids(query.user_id).
  3. Stash the result in user_features.<role>_user_ids.

Let's read BlockedUserIdsQueryHydrator in full as the template:

use crate::models::query::ScoredPostsQuery;
use crate::models::user_features::UserFeatures;
use std::sync::Arc;
use tonic::async_trait;
use xai_candidate_pipeline::component_library::clients::SocialGraphClientOps;
use xai_candidate_pipeline::query_hydrator::QueryHydrator;

pub struct BlockedUserIdsQueryHydrator {
    pub socialgraph_client: Arc<dyn SocialGraphClientOps>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for BlockedUserIdsQueryHydrator {
    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let blocked_user_ids = self
            .socialgraph_client
            .get_blocked_user_ids(query.user_id)
            .await
            .map_err(|e| e.to_string())?;

        Ok(ScoredPostsQuery {
            user_features: UserFeatures {
                blocked_user_ids,
                ..Default::default()
            },
            ..Default::default()
        })
    }

    fn update(&self, query: &mut ScoredPostsQuery, hydrated: ScoredPostsQuery) {
        query.user_features.blocked_user_ids = hydrated.user_features.blocked_user_ids;
    }
}

The pattern: the hydrator returns a ScoredPostsQuery::default() with just one nested field populated. Then update is responsible for copying that one field into the canonical query.

Why is this safe across 4 parallel hydrators that all write to user_features? Because each hydrator only copies its own specific field in update:

  • BlockedUserIdsQueryHydrator::update copies blocked_user_ids only.
  • MutedUserIdsQueryHydrator::update copies muted_user_ids only.
  • FollowedUserIdsQueryHydrator::update copies followed_user_ids only.
  • SubscribedUserIdsQueryHydrator::update copies subscribed_user_ids only.

Even though all four run in parallel and produce competing default-UserFeatures structs (where 3 of 4 fields are wrong/empty), the sequential update step correctly merges them.

This is the cleanest illustration of the hydrate-then-update pattern from Session 01: hydrators produce full result structs (potentially with stale data for fields they don't own), and update merges by selectively copying only what each hydrator is responsible for.

The other three (muted_user_ids, followed_user_ids, subscribed_user_ids) are byte-for-byte identical in structure — just s/blocked/muted/, etc. Won't repeat them.


cached_posts_query_hydrator.rs (82 lines)

Fetches a previously-scored set of posts from Redis. If found and large enough, lets the pipeline skip the entire scoring path.

const MIN_CACHED_POSTS_THRESHOLD: usize = 500;
const REDIS_GET_TIMEOUT: Duration = Duration::from_millis(300);

pub struct CachedPostsQueryHydrator {
    pub redis_client: Arc<dyn RedisClient>,
}

Two constants:

  • MIN_CACHED_POSTS_THRESHOLD = 500: the cache result must have at least 500 posts to be considered "useful." Below that, treat as a miss (the scoring would have to fetch more anyway).
  • REDIS_GET_TIMEOUT = 300ms: don't let Redis drag the request.
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableCachedPosts)
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let cache_key = redis_client::cached_posts_key(
            query.user_id,
            &query.topic_ids,
            query.in_network_only,
            query.exclude_videos,
        );

The cache key is computed from (user_id, topic_ids, in_network_only, exclude_videos) — the surface-defining inputs. Two requests for the same surface from the same user can hit the same cache; switching topics or surfaces invalidates.

        let start = Instant::now();
        let payload =
            match tokio::time::timeout(REDIS_GET_TIMEOUT, self.redis_client.get(cache_key.clone()))
                .await
            {
                Ok(Ok(bytes)) => bytes,
                Ok(Err(err)) => {
                    warn!(
                        cache_key = %cache_key,
                        latency_ms = %start.elapsed().as_millis(),
                        error = %err,
                        "CachedPostsQueryHydrator redis GET error"
                    );
                    return Err(err.to_string());
                }
                Err(_) => {
                    warn!(
                        cache_key = %cache_key,
                        latency_ms = %start.elapsed().as_millis(),
                        timeout_ms = %REDIS_GET_TIMEOUT.as_millis(),
                        "CachedPostsQueryHydrator redis GET timed out"
                    );
                    return Err(format!(
                        "Redis GET timed out after {}ms",
                        REDIS_GET_TIMEOUT.as_millis()
                    ));
                }
            };

Three-arm match: success, Redis error, timeout. All errors log a structured warning with key + latency for debugging.

Ok(Ok(...)) — the outer Ok is "timeout did not fire," the inner Ok is "Redis returned successfully." Double-result is common when wrapping with tokio::time::timeout.

        if payload.is_empty() {
            return Ok(ScoredPostsQuery::default());
        }

        let decompressed = zstd::decode_all(payload.as_slice()).map_err(|err| err.to_string())?;
        let cached_posts: Vec<PostCandidate> =
            serde_json::from_slice(&decompressed).map_err(|err| err.to_string())?;

        let has_cached_posts = cached_posts.len() >= MIN_CACHED_POSTS_THRESHOLD;

        Ok(ScoredPostsQuery {
            cached_posts,
            has_cached_posts,
            ..Default::default()
        })
    }

Three-stage decode: empty payload (cache miss) → return default. Otherwise, zstd decompress + JSON deserialize. Set has_cached_posts flag based on the count threshold.

This is the source of truth for the has_cached_posts gate we've seen on every candidate hydrator's enable. When this flag is true, downstream hydrators short-circuit and the cached scores are re-used.


mutual_follow_query_hydrator.rs (37 lines)

pub struct MutualFollowQueryHydrator {
    pub strato_client: Arc<dyn StratoClient + Send + Sync>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for MutualFollowQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableMutualFollowJaccardHydration)
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let user_id = query.user_id as i64;
        let result = self
            .strato_client
            .get_minhash_with_count(user_id)
            .await
            .map_err(|e| e.to_string())?;

        let viewer_minhash = result.map(|(minhash, _count)| minhash);

        Ok(ScoredPostsQuery {
            viewer_minhash,
            ..Default::default()
        })
    }

    fn update(&self, query: &mut ScoredPostsQuery, hydrated: ScoredPostsQuery) {
        query.viewer_minhash = hydrated.viewer_minhash;
    }
}

Fetches the viewer's MinHash signature — the input side of the Jaccard similarity we saw in Session 08's MutualFollowJaccardHydrator (which fetches author MinHashes per-candidate). Same flag gates both.

result.map(|(minhash, _count)| minhash) discards the count (get_minhash_with_count returns both, but here we only need the hash).


scoring_sequence_query_hydrator.rs (68 lines) + retrieval_sequence_query_hydrator.rs (71 lines)

These are the engagement history fetchers — they pull the user's recent action sequence from the User Action Aggregation service. The output is the Phoenix model's primary input feature.

pub struct ScoringSequenceQueryHydrator {
    pub user_action_aggregation_client: Arc<dyn UserActionAggregationClient + Send + Sync>,
}

Single client. The interesting part is the long parameterized fetch call:

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let cluster = UserActionsCluster::parse(&query.params.get(UserActionsClusterId));
        let include_realtime: bool = query.params.get(IncludeRealtimeActions);
        let source_data_type =
            UserActionSequenceSourceDataType::from_str_name(&query.params.get(UasSourceDataType))
                .unwrap_or(UserActionSequenceSourceDataType::Arrow);
        let use_xds: bool = query.params.get(UseXdsForUas);
        let result = self
            .user_action_aggregation_client
            .fetch_aggregated_sequence(
                cluster,
                query.user_id,
                p::UAS_WINDOW_TIME_MS as u32,
                query.params.get(MaxSeqLengthScoring),
                UserActionAggregationType::from_str_name(&query.params.get(PhoenixAggregationType))
                    .unwrap_or(UserActionAggregationType::DenseWithNotInterestedIn),
                source_data_type,
                ResponseFormat::Arrow,
                if include_realtime { Some(true) } else { None },
                Some(query.prediction_id as i64),
                use_xds,
            )
            .await
            .map_err(|e| format!("Aggregation service call failed: {}", e))?;

Every parameter is feature-switch-driven. Cluster ID, realtime opt-in, source data format, aggregation type, max sequence length, response format. All evaluated against the query's params snapshot — meaning the same Phoenix model can be tested against different sequence configurations without code changes.

The parameters:

  • cluster — which Aggregation cluster to call (different clusters might have different staleness/quality trade-offs).
  • query.user_id — whose history.
  • UAS_WINDOW_TIME_MS — how far back in time to look. Constant.
  • MaxSeqLengthScoring — cap on returned actions.
  • UserActionAggregationType::DenseWithNotInterestedIn — default aggregation: keep "not interested" signals (an explicit negative signal user gave).
  • source_data_type — Arrow (columnar binary) by default.
  • ResponseFormat::Arrow — return Arrow blob.
  • include_realtime — augment with last-N-seconds actions (not yet in the aggregated index).
  • Some(query.prediction_id as i64) — pass the prediction ID for join keys later (engagement events fired against this scoring will reference this prediction_id).
  • use_xds — service discovery via xDS (Envoy's discovery protocol).
        Ok(ScoredPostsQuery {
            scoring_sequence: Some(result.sequence),
            columnar_scoring_sequence: result.columnar_bytes,
            ..Default::default()
        })
    }

Stuff both the structured sequence AND the columnar bytes blob into the query. The Phoenix scorer prefers the columnar version (faster to feed into model inference); the structured one is for debugging.

retrieval_sequence_query_hydrator.rs is the sibling — same shape but with MaxSeqLengthRetrieval (a different cap) and PhoenixRetrievalAggregationType (a different default — Dense, no "not interested in" specifically because retrieval and scoring use different action sets). Also, prediction_id is None here — retrieval doesn't get a join key.

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        // ...
        let result = self.user_action_aggregation_client.fetch_aggregated_sequence(
            cluster,
            query.user_id,
            p::UAS_WINDOW_TIME_MS as u32,
            query.params.get(MaxSeqLengthRetrieval),
            UserActionAggregationType::from_str_name(
                &query.params.get(PhoenixRetrievalAggregationType),
            )
            .unwrap_or(UserActionAggregationType::Dense),
            source_data_type,
            ResponseFormat::Arrow,
            if include_realtime { Some(true) } else { None },
            None,                                       // <- no prediction_id
            use_xds,
        ).await.map_err(...)?;

        Ok(ScoredPostsQuery {
            retrieval_sequence: Some(result.sequence),
            columnar_retrieval_sequence: result.columnar_bytes,
            ..Default::default()
        })
    }

So we make two parallel calls to the same Aggregation service — once for scoring history, once for retrieval history. Different sequence lengths, different aggregation rules.


served_history_query_hydrator.rs (109 lines)

Fetches what the user was recently served. Powers three downstream uses:

  1. served_ids — used by PreviouslyServedPostsFilter (Session 05).
  2. who_to_follow_eligible — gates the WTF module to enforce fatigue limits.
  3. served_history — the raw entries, kept for the side-effect that updates them.
pub struct ServedHistoryQueryHydrator {
    client: Arc<dyn ServedHistoryClient>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for ServedHistoryQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableUrtMigrationComponents)
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let platform = client_platform::from_client_app_id(query.client_app_id);
        let entries = self
            .client
            .get_recent(query.user_id, TimelineType::Home, platform)
            .await
            .map_err(|e| e.to_string())?;

Pull recent home-timeline served entries scoped to the client's platform (iOS / Android / web). The platform scoping matters because served-history is tracked per-platform — what we showed on the web doesn't necessarily count for the iOS app's "don't repeat" filter.

        let who_to_follow_eligible = is_module_eligible(
            &entries,
            EntityIdType::WHO_TO_FOLLOW,
            query.params.get(WhoToFollowFatigueHours),
            query.request_time_ms,
        );

        let served_ids = recently_served_ids(
            &entries,
            query.request_time_ms,
            query.params.get(ExcludeServedTweetIdsDuration),
            query.params.get(ExcludeServedTweetIdsNumber),
        );

Two derived computations from the entries.

        Ok(ScoredPostsQuery {
            served_history: entries,
            served_ids,
            who_to_follow_eligible,
            ..Default::default()
        })
    }

    fn update(&self, query: &mut ScoredPostsQuery, hydrated: ScoredPostsQuery) {
        query.served_history = hydrated.served_history;
        query.served_ids = hydrated.served_ids;
        query.who_to_follow_eligible = hydrated.who_to_follow_eligible;
    }
}

Three-field update. Multi-output hydrator.

fn recently_served_ids(
    history: &[ServedHistory],
    now_ms: i64,
    duration_minutes: u32,
    max_items: usize,
) -> Vec<u64> {
    let min_time_ms = now_ms - (duration_minutes as i64 * 60_000);

    history
        .iter()
        .filter(|sh| sh.served_time_ms.is_some_and(|t| t >= min_time_ms))
        .flat_map(|sh| {
            sh.entries.iter().flat_map(|entry| {
                entry.item_ids.iter().flatten().flat_map(|ids| {
                    [ids.tweet_id, ids.source_tweet_id]
                        .into_iter()
                        .flatten()
                        .map(|id| id as u64)
                })
            })
        })
        .take(max_items)
        .collect()
}

Filter served_history to entries within duration_minutes (e.g., last 60 minutes), then collect both the tweet_id and source_tweet_id (so retweets are caught from both angles), capped at max_items.

The nested flat_map chain: history → entries → item_ids → [tweet_id, source_tweet_id]. .flatten() is used in two places: once on item_ids.iter().flatten() (item_ids is Option<...>) and once on [tweet_id, source_tweet_id].into_iter().flatten() (each ID is Option<u64>).

fn is_module_eligible(
    history: &[ServedHistory],
    entity_type: EntityIdType,
    fatigue_hours: u32,
    now_ms: i64,
) -> bool {
    let min_interval_ms = fatigue_hours as i64 * 3_600_000;

    let last_served_ms = history
        .iter()
        .filter(|sh| sh.entries.iter().any(|e| e.entity_type == entity_type))
        .filter_map(|sh| sh.served_time_ms)
        .max();

    match last_served_ms {
        Some(ts) => (now_ms - ts) >= min_interval_ms,
        None => true,
    }
}

Fatigue: don't show a Who-to-Follow module again if we've shown one in the last fatigue_hours. Find the most recent serve of this entity type, compute the interval. If never served, eligible.


impression_bloom_filter_query_hydrator.rs (50 lines)

Fetches the user's pre-computed impression Bloom filters. Used by PreviouslySeenPostsFilter (Session 05).

pub struct ImpressionBloomFilterQueryHydrator {
    pub client: Arc<dyn ImpressionBloomFilterClient>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for ImpressionBloomFilterQueryHydrator {
    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let user_id = query.user_id as i64;

        let bloom_filter_thrift = self
            .client
            .get(user_id, SurfaceArea::HOME_TIMELINE)
            .await
            .map_err(|e| e.to_string())?;

        let entries: Vec<ImpressionBloomFilterEntry> = bloom_filter_thrift
            .and_then(|s| s.entries)
            .unwrap_or_default()
            .iter()
            .map(|e| thrift_entry_to_proto(e))
            .collect();

Fetch the Thrift-encoded bloom filters for HOME_TIMELINE surface. Convert each entry from Thrift → internal struct.

        Ok(ScoredPostsQuery {
            bloom_filter_entries: entries,
            ..Default::default()
        })
    }

Stash the parsed entries on the query.

fn thrift_entry_to_proto(
    entry: &xai_x_thrift::impression_bloom_filter::ImpressionBloomFilterEntry,
) -> ImpressionBloomFilterEntry {
    ImpressionBloomFilterEntry {
        bloom_filter: entry.bloom_filter.iter().map(|&v| v as u64).collect(),
        size_cap: entry.size_cap,
        false_positive_rate: entry.false_positive_rate.into(),
    }
}

Field-by-field copy. bloom_filter is Vec<i64> in Thrift → Vec<u64> internally. Same i64↔u64 cast we've seen everywhere.


impressed_posts_query_hydrator.rs (25 lines) — orphan-in-pipeline

pub struct ImpressedPostsQueryHydrator {
    pub client: Arc<dyn ImpressedPostsClient>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for ImpressedPostsQueryHydrator {
    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let impressed_post_ids = self.client.get(query.user_id).await?;

        Ok(ScoredPostsQuery {
            impressed_post_ids,
            ..Default::default()
        })
    }

    fn update(&self, query: &mut ScoredPostsQuery, hydrated: ScoredPostsQuery) {
        query.impressed_post_ids = hydrated.impressed_post_ids;
    }
}

Trivial single-call hydrator. From Session 06: this is dark-launched — constructed but never added to the pipeline's query_hydrators vec. The code is ready; flipping the wiring activates the feature.

Note await? — the ? propagates the client's error directly. Cleaner than .map_err(|e| e.to_string())? because the client already returns Result<_, String>-compatible errors.


past_request_timestamps_query_hydrator.rs (36 lines)

pub struct PastRequestTimestampsQueryHydrator {
    client: Arc<dyn PastRequestTimestampsClient>,
}

impl PastRequestTimestampsQueryHydrator {
    pub fn new(client: Arc<dyn PastRequestTimestampsClient>) -> Self {
        Self { client }
    }
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for PastRequestTimestampsQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableUrtMigrationComponents)
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let non_polling_timestamps = self.client.get(query.user_id).await?;

        Ok(ScoredPostsQuery {
            non_polling_timestamps,
            ..Default::default()
        })
    }

    fn update(&self, query: &mut ScoredPostsQuery, hydrated: ScoredPostsQuery) {
        query.non_polling_timestamps = hydrated.non_polling_timestamps;
    }
}

Same shape. Fetches when the user last did non-polling (foreground) requests. Used downstream by URT decoration to decide whether to show a "since you last visited" marker.

The EnableUrtMigrationComponents flag — both this and served_history_query_hydrator are gated by the same flag, indicating a coordinated migration.


user_demographics_query_hydrator.rs (30 lines)

pub struct UserDemographicsQueryHydrator {
    pub client: Arc<dyn UserDemographicsClient>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for UserDemographicsQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableContextFeatures) || query.is_shadow_traffic
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let demographics = self.client.fetch(query.user_id).await?;

        Ok(ScoredPostsQuery {
            user_demographics: demographics,
            ..Default::default()
        })
    }
    …
}

Pulls the viewer's demographic profile (age, location, etc.). Context features are gated together — multiple hydrators flip on/off via EnableContextFeatures for an "enrich the model input with more demographic signals" experiment.


user_inferred_gender_query_hydrator.rs (58 lines) — two-tier fetch

pub struct UserInferredGenderQueryHydrator {
    mh_client: Arc<dyn UserInferredGenderStoreClient>,
    grpc_client: Arc<dyn GenderPredictionGrpcClient>,
}

Two clients: a Manhattan store (cached predictions) and a gRPC service (live prediction).

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let result = match self.mh_client.fetch(query.user_id).await? {
            Some(r) => Some(r),
            None if is_new_user(query.user_id) => self.grpc_client.predict(query.user_id).await?,
            None => None,
        };

        Ok(ScoredPostsQuery {
            user_inferred_gender: result
                .as_ref()
                .and_then(|r| InferredGenderLabel::try_from(r.gender_label).ok()),
            user_inferred_gender_score: result.and_then(|r| r.prediction_score),
            ..Default::default()
        })
    }
    …
}

fn is_new_user(user_id: u64) -> bool {
    days_since_creation(user_id) == 0
}

Tiered fetch:

  1. Try the Manhattan-cached prediction.
  2. If empty AND user is new (account < 1 day old), call the live gRPC prediction.
  3. Otherwise (cache miss + established user) — give up, return None.

Why this asymmetry? Because the Manhattan store is populated by a batch job that runs daily. New users haven't been processed yet, so they need on-the-fly prediction. Established users with no cached prediction probably have no prediction by design (maybe they're under 18 or in an opt-out region).

is_new_user(user_id) uses days_since_creation(user_id) == 0 — the snowflake-ID-derived age computation we've seen before.


followed_grok_topics_query_hydrator.rs (75 lines) — most-flag-gated

const MH_GET_TIMEOUT: Duration = Duration::from_millis(300);
pub struct FollowedGrokTopicsQueryHydrator {
    client: Arc<dyn FollowedGrokTopicsStoreClient>,
}

300ms timeout on the Manhattan get.

    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableContextFeatures)
            || query.is_shadow_traffic
            || query.params.get(EnableNewUserTopicRetrieval)
            || query.params.get(EnableNewUserTopicFiltering)
    }

Four-way OR enable. This hydrator's output feeds multiple downstream paths, each with its own flag. As long as any downstream is using it, hydrate.

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let ids = tokio::time::timeout(MH_GET_TIMEOUT, self.client.fetch(query.user_id))
            .await
            .map_err(|_| "FollowedGrokTopics MH get timed out".to_string())??;

        let topics = ids
            .as_ref()
            .map(|ids| ids_to_bool_array(ids, &PARENT_TOPIC_IDS));

Wrapped timeout. The ?? double-question is interesting: outer ? propagates timeout errors (after map_err), inner ? propagates the underlying client's error.

ids_to_bool_array(ids, &PARENT_TOPIC_IDS) — converts a list of topic IDs into a fixed-size [bool; 32] bitmap indicating which of the 32 parent topics the user follows. This is the dense feature representation the Phoenix model expects.

        let new_user_topics_enabled = query.params.get(EnableNewUserTopicRetrieval)
            || query.params.get(EnableNewUserTopicFiltering);
        let new_user_topic_ids =
            if new_user_topics_enabled && !query.is_topic_request() && !query.in_network_only {
                let threshold = Duration::from_secs(query.params.get(NewUserTopicAgeThresholdSecs));
                let is_new_user = duration_since_creation_opt(query.user_id)
                    .map(|age| age < threshold)
                    .unwrap_or(false);

                if is_new_user {
                    ids.clone().unwrap_or_default()
                } else {
                    vec![]
                }
            } else {
                vec![]
            };

Secondary output: new_user_topic_ids. The same topic IDs are also propagated as a non-bitmap list if the user is new (account age < NewUserTopicAgeThresholdSecs) AND the request isn't already topic/in-network. These IDs feed NewUserTopicIdsFilter (Session 05) which restricts OON candidates to declared topics for new users.

        Ok(ScoredPostsQuery {
            followed_grok_topics: topics,
            new_user_topic_ids,
            ..Default::default()
        })
    }

    fn update(&self, query: &mut ScoredPostsQuery, hydrated: ScoredPostsQuery) {
        query.followed_grok_topics = hydrated.followed_grok_topics;
        if !hydrated.new_user_topic_ids.is_empty() {
            query.new_user_topic_ids = hydrated.new_user_topic_ids;
        }
    }

Subtle: if !hydrated.new_user_topic_ids.is_empty() — only overwrite new_user_topic_ids if non-empty. Otherwise leave whatever was there (in case a different source populated it). Defensive merging.


followed_starter_packs_query_hydrator.rs (44 lines)

pub struct FollowedStarterPacksQueryHydrator {
    client: Arc<dyn FollowedStarterPacksStoreClient>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for FollowedStarterPacksQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableContextFeatures) || query.is_shadow_traffic
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let ids = tokio::time::timeout(MH_GET_TIMEOUT, self.client.fetch(query.user_id))
            .await
            .map_err(|_| "FollowedStarterPacks MH get timed out".to_string())??;

        let packs = ids.map(|ids| ids_to_bool_array(&ids, &PACK_IDS));

        Ok(ScoredPostsQuery {
            followed_starter_packs: packs,
            ..Default::default()
        })
    }
    …
}

Smaller cousin of FollowedGrokTopics. Starter packs (curated topic bundles for onboarding) → [bool; 20] bitmap.


inferred_grok_topics_query_hydrator.rs (39 lines)

pub struct InferredGrokTopicsQueryHydrator {
    pub strato_client: Arc<dyn StratoClient + Send + Sync>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for InferredGrokTopicsQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableGrokTopicsHydration)
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let topics = self
            .strato_client
            .get_inferred_grok_topics(query.user_id)
            .await
            .map_err(|e| e.to_string())?
            .map(|ids| {
                let max_categories = query.params.get(MaxInferredGrokTopicCategories) as usize;
                ids_to_multihot(&ids, max_categories)
            });

        Ok(ScoredPostsQuery {
            inferred_grok_topics: topics,
            ..Default::default()
        })
    }
    …
}

Fetches inferred (vs. followed) topics — Grok-classified interest topics based on the user's engagement, not their explicit follows. Capped at MaxInferredGrokTopicCategories to bound the feature size.

ids_to_multihot vs the previous ids_to_bool_array: different output shape. Multihot here means a topic-categories vector (more flexible than fixed [bool; 32]).


ip_query_hydrator.rs (30 lines)

pub struct IpQueryHydrator {
    pub client: Arc<GeoIpLocationClient>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for IpQueryHydrator {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        query.params.get(EnableIpFeature) && !query.ip_address.is_empty()
    }

    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let result = self.client.fetch(&query.ip_address).await;

        Ok(ScoredPostsQuery {
            ip_location: result.location,
            ..Default::default()
        })
    }
    …
}

GeoIP lookup. Note the double gate: flag + non-empty IP. The IP comes from the original request's device_status.ip_address (set in Session 04's QueryBuilder); it might be empty for server-internal requests or anonymized traffic.

result.location — already an Option, propagates None through. Note no ? — the fetch is infallible (always returns a Result-less location, which is Option<Location> internally). If IP lookup fails, location is just None.


Orphan files

user_action_seq_query_hydrator.rs (188 lines) — earlier sequence hydrator

References the orphan crate::candidate_pipeline::query::ScoredPostsQuery (the simpler version from Session 06's orphan analysis). This is a pre-refactor version of what's now done by the two Aggregation-service hydrators (ScoringSequenceQueryHydrator + RetrievalSequenceQueryHydrator).

pub struct UserActionSeqQueryHydrator {
    pub uas_fetcher: Arc<UserActionSequenceFetcher>,
    global_filter: Arc<dyn UserActionFilter>,
    aggregator: Arc<dyn UserActionAggregator>,
    post_filters: Vec<Arc<dyn AggregatedActionFilter>>,
}

Notable: this version runs the aggregation locally (with filters and an aggregator) rather than calling out to a separate Aggregation service. The current production version delegates aggregation to a remote service for consistency across different consuming services.

The body has interesting domain logic worth quoting:

    fn aggregate_user_action_sequence(
        &self,
        user_id: i64,
        uas_thrift: ThriftUserActionSequence,
    ) -> Result<UserActionSequence, String> {
        let thrift_user_actions = uas_thrift.user_actions.clone().unwrap_or_default();
        if thrift_user_actions.is_empty() {
            return Err(format!("No user actions found for user {}", user_id));
        }

        let filtered_actions = self.global_filter.run(thrift_user_actions);
        if filtered_actions.is_empty() {
            return Err(format!(
                "No user actions remaining after filtering for user {}",
                user_id
            ));
        }

        let mut aggregated_actions =
            self.aggregator
                .run(&filtered_actions, p::UAS_WINDOW_TIME_MS, 0);

        for filter in &self.post_filters {
            aggregated_actions = filter.run(aggregated_actions);
        }

        if aggregated_actions.len() > p::UAS_MAX_SEQUENCE_LENGTH {
            let drain_count = aggregated_actions.len() - p::UAS_MAX_SEQUENCE_LENGTH;
            aggregated_actions.drain(0..drain_count);
        }
        …
    }

The pipeline: filter raw actions → aggregate → post-filter → truncate. Different from the new approach because the new approach pushes all this to the Aggregation service. Truncation drain(0..drain_count) keeps the most recent entries (assuming the sequence is time-ordered ascending).

The rest of the file is the Thrift-to-proto conversion of aggregated actions. Same pattern as we've seen elsewhere — boilerplate Thrift unwrapping + proto building.

This file probably exists in the source tree as a reference / fallback, but isn't wired. Same as the query.rs / candidate.rs orphans from Session 06.

user_features_query_hydrator.rs (41 lines) — fetch user features from Strato

pub struct UserFeaturesQueryHydrator {
    pub strato_client: Arc<dyn StratoClient + Send + Sync>,
}

#[async_trait]
impl QueryHydrator<ScoredPostsQuery> for UserFeaturesQueryHydrator {
    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<ScoredPostsQuery, String> {
        let user_id = query.user_id;
        let client = &self.strato_client;
        let result = client.get_user_features(user_id);
        let result = result.await.map_err(|e| e.to_string())?;
        let decoded: StratoResult<StratoValue<UserFeatures>> = decode(&result);
        match decoded {
            StratoResult::Ok(v) => {
                let user_features = v.v.unwrap_or_default();
                Ok(ScoredPostsQuery {
                    user_features,
                    ..Default::default()
                })
            }
            StratoResult::Err(_) => Err("Error received from strato".to_string()),
        }
    }
    …
}

Another orphan. References the orphan crate::candidate_pipeline::query and crate::candidate_pipeline::query_features::UserFeatures (the simpler UserFeatures from Session 06). This was the single-shot user features fetcher — one call returns blocked/muted/followed/subscribed/muted_keywords all at once.

The new approach (active in production) splits this into four separate hydrators (BlockedUserIdsQueryHydrator, etc.) that run in parallel. Probably for better cache hit rates per dimension, or for finer control over per-dimension timeouts/failures.

Both orphan files use Strato decode patterns (StratoResult<StratoValue<UserFeatures>>) that hint at this earlier era's data fetching style — directly hitting Strato rather than purpose-built services.


What we've learned

The query-hydrator pattern: 16 active hydrators, each ~30-80 lines, each filling in one slice of ScoredPostsQuery. The structural shape is identical across all of them:

  1. Hold an Arc<dyn Client>.
  2. enable gates on feature flag(s).
  3. hydrate calls the client, returns a defaults-everything ScoredPostsQuery with just the target fields populated.
  4. update copies only the target fields back into the canonical query.

Parallel writes work via per-field update: four socialgraph_client hydrators run in parallel and each modifies user_features. Each returns a partial UserFeatures::default() where only its field is meaningful. The framework calls update sequentially after all parallel fetches, and each update only copies its specific field. No data clobbering because each owns disjoint slices.

Two parallel UAS fetches: scoring sequence (max length N, aggregator A, with prediction_id) + retrieval sequence (max length M, aggregator B, no prediction_id). Same client, different parameters — two separate hydrators for clarity.

Three-field hydrators: ServedHistoryQueryHydrator populates served_history, served_ids, who_to_follow_eligible in one call. Multi-field hydrators are fine as long as the fields are computed from one shared upstream call.

Tiered fetches: UserInferredGenderQueryHydrator tries Manhattan first, falls back to gRPC for new users. Cached vs. live trade-off implemented as an explicit fallback.

Common gates:

  • EnableContextFeatures — group flag for demographic/context hydrators.
  • EnableUrtMigrationComponents — group flag for URT-related hydrators (served_history, past_request_timestamps).
  • query.is_shadow_traffic — universal "enable for shadow logging" escape hatch.
  • query.params.get(EnableNewUserXxx) + is_new_user(query.user_id) — combined feature + new-user gate.

Bitmap features: followed_grok_topics: [bool; 32], followed_starter_packs: [bool; 20]. Fixed-size arrays packed into the query for fast model input. ids_to_bool_array does the lookup.

Timeouts everywhere:

  • 300ms on Redis get (cached_posts).
  • 300ms on Manhattan (FollowedGrokTopics, FollowedStarterPacks).
  • Service-internal timeouts on the others (set by the client implementations).

?? double-question: tokio::time::timeout(...).await.map_err(...)?? — outer ? for timeout error, inner ? for client error. Common pattern when wrapping with timeout.

Orphan analysis: user_action_seq_query_hydrator.rs and user_features_query_hydrator.rs are pre-refactor leftovers. Both reference the orphan candidate_pipeline::query::ScoredPostsQuery and query_features::UserFeatures. They show the earlier design: local action-aggregation, single Strato call for all user features. The current design uses remote aggregation and parallel per-dimension fetches.


Next session

Session 10 — Scorers + Selectors. About 933 LOC. The actual ranking logic:

  • home-mixer/scorers/phoenix_scorer.rs (124) — the Phoenix model inference call
  • home-mixer/scorers/ranking_scorer.rs (290) — combine Phoenix scores into final score
  • home-mixer/scorers/author_diversity_scorer.rs (72) — diversity attenuation
  • home-mixer/scorers/oon_scorer.rs (38) — OON adjustment
  • home-mixer/scorers/weighted_scorer.rs (92) — linear combiner
  • home-mixer/scorers/vm_ranker.rs (132) — Video Mixer ranker
  • home-mixer/selectors/top_k_score_selector.rs (15)
  • home-mixer/selectors/blender_selector.rs (164) — the For You blender