X For You algorithm, line by line · Part 7
X For You algorithm, line by line — Part 7: Candidate hydrators (part 1 of 2)
Part 7 of the deep dive into xai-org/x-algorithm. The structural candidate hydrators: in_network, core_data, subscription, gizmoduck (composite cache key), blocked_by (non-cached), has_media (shadow-only), language_code, video_duration. The CachedHydrator pattern via Moka cache, three-way result matching, and the has_cached_posts gate.
Now we read the actual hydrator implementations. Hydrators are the per-candidate enrichment stage — given a list of PostCandidates (mostly empty after sources return them), each hydrator fills in one specific slice of data by calling out to an external service.
From Session 06 we know the Phoenix pipeline runs 10 hydrators pre-filter and 6 more post-selection. Sessions 07-08 cover those 16 across two sessions. This session takes the structural / per-tweet ones (8 files, 707 LOC):
home-mixer/candidate_hydrators/
├── mod.rs (16)
├── in_network_candidate_hydrator.rs (45) compute-only: viewer follows author?
├── core_data_candidate_hydrator.rs (133) TES: author/text/reply/retweet fields
├── subscription_hydrator.rs (82) TES: paywall gating
├── gizmoduck_hydrator.rs (147) Gizmoduck: author profile + follower count
├── blocked_by_hydrator.rs (57) social graph: author blocks viewer?
├── has_media_hydrator.rs (85) TES: does the post have media?
├── language_code_hydrator.rs (84) TES: tweet language
└── video_duration_candidate_hydrator.rs (85) TES: min video duration
The remaining 8 hydrators (engagement counts, ads brand safety x2, tweet type metrics, quote, vf, mutual follow jaccard, filtered topics, following replied) come in Session 08.
Important pattern repetition: 7 of these 8 hydrators implement CachedHydrator (the cache-backed variant from Session 01). They all follow the same shape:
- Define
CacheKeyandCacheValuetypes. enable(gate onquery.has_cached_posts).cache_store / cache_key / cache_value / hydrate_from_cache— boilerplate.hydrate_from_client— actual external call.update— merge specific fields back.
We'll establish the pattern fully in the first cached hydrator (core_data_candidate_hydrator) and then point out only deviations in the rest.
mod.rs (16 lines)
pub mod ads_brand_safety_hydrator;
pub mod ads_brand_safety_vf_hydrator;
pub mod blocked_by_hydrator;
pub mod core_data_candidate_hydrator;
pub mod filtered_topics_hydrator;
pub mod following_replied_users_hydrator;
pub mod gizmoduck_hydrator;
pub mod has_media_hydrator;
pub mod in_network_candidate_hydrator;
pub mod language_code_hydrator;
pub mod mutual_follow_jaccard_hydrator;
pub mod quote_hydrator;
pub mod subscription_hydrator;
pub mod tweet_type_metrics_hydrator;
pub mod vf_candidate_hydrator;
pub mod video_duration_candidate_hydrator;
All 16 hydrator modules. Sessions 07-08 split this list.
in_network_candidate_hydrator.rs (45 lines)
The only non-cached, non-network hydrator. Pure compute: given the viewer's followed-user set (from query) and the candidate's author_id, compute in_network = is_self || is_followed.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::collections::HashSet;
use tonic::async_trait;
use xai_candidate_pipeline::hydrator::Hydrator;
pub struct InNetworkCandidateHydrator;
#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for InNetworkCandidateHydrator {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
!query.has_cached_posts
}
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let viewer_id = query.user_id;
let followed_ids: HashSet<u64> = query
.user_features
.followed_user_ids
.iter()
.copied()
.map(|id| id as u64)
.collect();
candidates
.iter()
.map(|candidate| {
let is_self = candidate.author_id == viewer_id;
let is_in_network = is_self || followed_ids.contains(&candidate.author_id);
Ok(PostCandidate {
in_network: Some(is_in_network),
..Default::default()
})
})
.collect()
}
fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
candidate.in_network = hydrated.in_network;
}
}
The structure:
- Unit struct
InNetworkCandidateHydrator;. No state. enable: skip if the query has cached posts (the cached path already has this field populated). This gate appears on essentially every hydrator and is a recurring theme: when re-using cached posts, skip the work.hydrate: pullfollowed_user_ids(cast tou64since it's stored asi64), build a HashSet, then map each candidate to a partialPostCandidatewith justin_networkset. The..Default::default()is the recurring pattern: "this hydrator owns only one field; default the rest."update: merge just thein_networkfield.
The Result<PostCandidate, String> wrapping is mandatory per the framework, but this hydrator never returns Err — pure compute can't fail. Note no Err path even for the cast id as u64 (which truncates negative i64 → huge u64; a pre-condition: followed_user_ids are never negative).
is_self = candidate.author_id == viewer_id is interesting: a viewer is "in network" with themselves. If a self-tweet leaks through SelfTweetFilter somehow, in_network = true for it. (But SelfTweetFilter drops self-tweets first, so this case shouldn't be reachable in practice.)
core_data_candidate_hydrator.rs (133 lines) — the canonical cached hydrator
The most-used hydrator. Calls TES (Tweet Entity Service) to get a tweet's core data: author_id, source_tweet_id, in_reply_to_tweet_id, text. We'll walk this one carefully because it establishes the pattern for the next 6 cached hydrators.
use crate::clients::tweet_entity_service_client::TESClient;
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::sync::Arc;
use tonic::async_trait;
use xai_candidate_pipeline::component_library::utils::{MokaCache, default_moka_cache};
use xai_candidate_pipeline::hydrator::{CacheStore, CachedHydrator};
use xai_stats_receiver::global_stats_receiver;
const FOUND_SCOPE: [(&str, &str); 1] = [("hydration", "found")];
const MISSING_SCOPE: [(&str, &str); 1] = [("hydration", "missing")];
Imports:
MokaCacheanddefault_moka_cache— the Moka cache is a TinyLFU-based in-memory cache.default_moka_cache()builds one with project defaults (size + TTL).CachedHydratorfrom the framework (Session 01's blanket impl).- Custom stats scope constants for "found in TES" vs. "not found."
pub struct CoreDataCandidateHydrator {
pub tes_client: Arc<dyn TESClient + Send + Sync>,
pub cache: MokaCache<u64, CoreDataCacheValue>,
}
impl CoreDataCandidateHydrator {
pub async fn new(tes_client: Arc<dyn TESClient + Send + Sync>) -> Self {
let cache = default_moka_cache();
Self { tes_client, cache }
}
}
The struct holds the TES client + an in-process cache. Each hydrator has its own cache: independent TTLs, independent size limits, independent eviction.
async fn new (despite no awaits) — for symmetry with other hydrators that do await.
#[async_trait]
impl CachedHydrator<ScoredPostsQuery, PostCandidate> for CoreDataCandidateHydrator {
type CacheKey = u64;
type CacheValue = CoreDataCacheValue;
fn enable(&self, query: &ScoredPostsQuery) -> bool {
!query.has_cached_posts
}
Implement CachedHydrator, get Hydrator for free via the blanket impl from Session 01. The two associated types:
CacheKey = u64— the tweet ID.CacheValue = CoreDataCacheValue— defined below.
fn cache_store(&self) -> &dyn CacheStore<Self::CacheKey, Self::CacheValue> {
&self.cache
}
fn cache_key(&self, candidate: &PostCandidate) -> Self::CacheKey {
candidate.tweet_id
}
Boilerplate: return the cache and key extractor.
fn cache_value(&self, hydrated: &PostCandidate) -> Self::CacheValue {
CoreDataCacheValue {
author_id: hydrated.author_id,
retweeted_user_id: hydrated.retweeted_user_id,
retweeted_tweet_id: hydrated.retweeted_tweet_id,
in_reply_to_tweet_id: hydrated.in_reply_to_tweet_id,
tweet_text: hydrated.tweet_text.clone(),
}
}
fn hydrate_from_cache(&self, value: Self::CacheValue) -> PostCandidate {
PostCandidate {
author_id: value.author_id,
retweeted_user_id: value.retweeted_user_id,
retweeted_tweet_id: value.retweeted_tweet_id,
in_reply_to_tweet_id: value.in_reply_to_tweet_id,
tweet_text: value.tweet_text,
..Default::default()
}
}
cache_value + hydrate_from_cache are paired: one extracts the fields owned by this hydrator into a cache entry; the other reconstructs a partial candidate from the cache entry. The two have to stay in sync — if you add a new field, you have to add it both places.
Note cache_value clones tweet_text (the only non-Copy field), but hydrate_from_cache moves the value out. Clone-on-write semantics: writes incur a clone, reads are free.
async fn hydrate_from_client(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let client = &self.tes_client;
let tweet_ids: Vec<u64> = candidates.iter().map(|c| c.tweet_id).collect();
let post_features = client.get_tweet_core_datas(tweet_ids.clone()).await;
The actual hydration logic — called only for cache misses. The cache framework (Session 01's blanket impl) routes hits vs misses; this method only sees misses.
get_tweet_core_datas takes a Vec<u64> and returns a HashMap<u64, Result<Option<CoreData>, Err>> (inferred from the matching below). This is the batch RPC pattern: send all missing IDs in one call, get back a per-ID result map.
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
let mut hydrated_count = 0usize;
let mut missing_count = 0usize;
for tweet_id in tweet_ids {
let post_features = post_features.get(&tweet_id);
match post_features {
Some(Ok(Some(core_data))) => {
hydrated_count += 1;
let text = core_data.text.clone();
let hydrated = PostCandidate {
author_id: core_data.author_id,
retweeted_user_id: core_data.source_user_id,
retweeted_tweet_id: core_data.source_tweet_id,
in_reply_to_tweet_id: core_data.in_reply_to_tweet_id,
tweet_text: text,
..Default::default()
};
hydrated_candidates.push(Ok(hydrated));
}
Some(Ok(None)) | None => {
missing_count += 1;
hydrated_candidates.push(Ok(PostCandidate::default()));
}
Some(Err(err)) => {
hydrated_candidates.push(Err(err.to_string()));
}
}
}
self.record_hydration_stats(hydrated_count, missing_count);
hydrated_candidates
}
Three-way match on the result for each tweet:
Some(Ok(Some(core_data))): hit. Build the partial candidate, pushOk(hydrated).Some(Ok(None)) | None: TES returned "not found" (deleted? never existed?) OR the response didn't include this ID. Both treated as missing — pushOk(PostCandidate::default())(effectively an empty hydration that theupdatestep will treat as "no fields to copy").Some(Err(err)): TES errored on this ID. PushErr.
The "found / missing" counters get emitted via record_hydration_stats at the bottom. Missing is not an error (returned as Ok(default)) — it's a normal case for deleted/private tweets. Only RPC errors are errors.
Note: Some(Ok(None)) | None returns an Ok(default), which means the next filter (CoreDataHydrationFilter, from Session 05) is what drops them. That filter drops candidates where author_id == 0 — default() has author_id = 0, so it gets caught there.
fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
candidate.retweeted_user_id = hydrated.retweeted_user_id;
candidate.retweeted_tweet_id = hydrated.retweeted_tweet_id;
candidate.in_reply_to_tweet_id = hydrated.in_reply_to_tweet_id;
candidate.tweet_text = hydrated.tweet_text;
}
}
Merge specific fields. Notable omission: author_id is NOT copied here. Why? Because if author_id was unset on the input candidate (some sources return only tweet_id), this hydrator would set it. But the hydrator's update deliberately leaves the original author_id alone — the input source is treated as authoritative for author identity.
Actually, this might be a bug or a quirk worth noting. If the source set author_id = 0 (because it didn't know), the candidate gets dropped by CoreDataHydrationFilter even if TES knows the author. The contract here is that sources must populate author_id.
#[derive(Clone, Debug)]
pub struct CoreDataCacheValue {
pub author_id: u64,
pub retweeted_user_id: Option<u64>,
pub retweeted_tweet_id: Option<u64>,
pub in_reply_to_tweet_id: Option<u64>,
pub tweet_text: String,
}
The cache value. Clone + Debug derives. No Copy because String isn't Copy.
impl CoreDataCandidateHydrator {
fn record_hydration_stats(&self, hydrated_count: usize, missing_count: usize) {
if let Some(receiver) = global_stats_receiver() {
let metric_name = format!("{}.hydrate", self.name());
receiver.incr(metric_name.as_str(), &FOUND_SCOPE, hydrated_count as u64);
receiver.incr(metric_name.as_str(), &MISSING_SCOPE, missing_count as u64);
}
}
}
The stats helper. Emits two counters: how many we found, how many were missing. Useful dashboard: "TES freshness" — if MISSING / (FOUND + MISSING) spikes, TES is dropping data.
That's the canonical cached hydrator. The next 6 follow the same shape.
subscription_hydrator.rs (82 lines)
Calls TES to find which posts are paywalled (subscription-gated) and by whom. Output: subscription_author_id: Option<u64> (None if not paywalled).
pub struct SubscriptionHydrator {
pub tes_client: Arc<dyn TESClient + Send + Sync>,
pub cache: MokaCache<u64, Option<u64>>,
}
Same shape as CoreDataCandidateHydrator — tes_client + cache. But the cache value type is just Option<u64> (not a struct) since it's a single field.
impl CachedHydrator<ScoredPostsQuery, PostCandidate> for SubscriptionHydrator {
type CacheKey = u64;
type CacheValue = Option<u64>;
fn enable(&self, query: &ScoredPostsQuery) -> bool {
!query.has_cached_posts
}
fn cache_store(&self) -> &dyn CacheStore<Self::CacheKey, Self::CacheValue> {
&self.cache
}
fn cache_key(&self, candidate: &PostCandidate) -> Self::CacheKey {
candidate.tweet_id
}
fn cache_value(&self, hydrated: &PostCandidate) -> Self::CacheValue {
hydrated.subscription_author_id
}
fn hydrate_from_cache(&self, value: Self::CacheValue) -> PostCandidate {
PostCandidate {
subscription_author_id: value,
..Default::default()
}
}
The four boilerplate methods. Note cache_value returns subscription_author_id directly (no clone — Option<u64> is Copy).
async fn hydrate_from_client(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let client = &self.tes_client;
let tweet_ids: Vec<u64> = candidates.iter().map(|c| c.tweet_id).collect();
let post_features = client.get_subscription_author_ids(tweet_ids.clone()).await;
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for tweet_id in tweet_ids {
let post_features = post_features.get(&tweet_id);
let hydrated = match post_features {
Some(Ok(value)) => Ok(PostCandidate {
subscription_author_id: *value,
..Default::default()
}),
None => Err(format!(
"Missing subscription author id for tweet_id={}",
tweet_id
)),
Some(Err(err)) => Err(err.to_string()),
};
hydrated_candidates.push(hydrated);
}
hydrated_candidates
}
Two-arm match (the inner value is already Option<u64> from TES). Note the difference from CoreDataCandidateHydrator: None is Err here, not Ok(default). So if TES doesn't return a result for a tweet, this hydrator reports a hydration error.
Why the inconsistency? Because for subscription_author_id, None is the valid "not paywalled" value. The cache framework needs to distinguish "TES didn't respond" (Err) from "TES said: not paywalled" (Ok(None)).
fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
candidate.subscription_author_id = hydrated.subscription_author_id;
}
}
Single-field update.
gizmoduck_hydrator.rs (147 lines) — composite cache key
Calls Gizmoduck (user service) to get author profile + counts. The interesting bit: composite cache key.
pub struct GizmoduckCandidateHydrator {
pub gizmoduck_client: Arc<dyn GizmoduckClient + Send + Sync>,
pub cache: MokaCache<GizmoduckCacheKey, GizmoduckCacheValue>,
}
Cache key is a struct, not a simple u64:
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct GizmoduckCacheKey {
pub author_id: u64,
pub retweeted_user_id: Option<u64>,
}
#[derive(Clone, Debug)]
pub struct GizmoduckCacheValue {
pub author_followers_count: Option<i32>,
pub author_screen_name: Option<String>,
pub retweeted_screen_name: Option<String>,
}
GizmoduckCacheKey derives Eq + Hash so it can be used as a HashMap key (the cache requires this — from the trait bound type CacheKey: Eq + Hash + Send + Sync + 'static).
Why a composite key? Because the hydrator looks up two users per candidate (the author AND the retweeted user). The cached output reflects both. Caching by tweet_id wouldn't work because the same tweet could appear as a retweet from different authors. Caching by (author_id, retweeted_user_id) is correct.
fn cache_key(&self, candidate: &PostCandidate) -> Self::CacheKey {
GizmoduckCacheKey {
author_id: candidate.author_id,
retweeted_user_id: candidate.retweeted_user_id,
}
}
Construct the key. Note this runs AFTER CoreDataCandidateHydrator has populated author_id and retweeted_user_id. So the hydrator order in the pipeline (Session 06) puts core_data before gizmoduck.
async fn hydrate_from_client(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let client = &self.gizmoduck_client;
let author_ids: Vec<_> = candidates.iter().map(|c| c.author_id).collect();
let author_ids: Vec<_> = author_ids.iter().map(|&x| x as i64).collect();
let retweet_user_ids: Vec<_> = candidates.iter().map(|c| c.retweeted_user_id).collect();
let retweet_user_ids: Vec<_> = retweet_user_ids.iter().flatten().collect();
let retweet_user_ids: Vec<_> = retweet_user_ids.iter().map(|&&x| x as i64).collect();
let mut user_ids_to_fetch = Vec::with_capacity(author_ids.len() + retweet_user_ids.len());
user_ids_to_fetch.extend(author_ids);
user_ids_to_fetch.extend(retweet_user_ids);
user_ids_to_fetch.dedup();
Build the list of user IDs to fetch:
- Collect all author IDs (always present).
- Collect all retweeted_user_ids that are
Some(.flatten()). - Cast u64 → i64 for the Gizmoduck API.
- Concatenate.
dedup()— if the same user appears multiple times in this batch (e.g., a prolific author with multiple candidates), only fetch once.
Vec::dedup() removes consecutive duplicates only — for a fully sorted list, that's the same as full dedup, but for an unsorted list (which this is), some duplicates leak through. So the dedup here is best-effort rather than guaranteed. Minor cost — at worst we fetch the same user twice in one batch.
let users = client.get_users(user_ids_to_fetch).await;
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for candidate in candidates {
let user = users.get(&(candidate.author_id as i64));
let user = match user {
Some(Ok(Some(user))) => Ok(Some(user)),
Some(Ok(None)) | None => Ok(None),
Some(Err(err)) => Err(err.to_string()),
};
let retweet_user = candidate
.retweeted_user_id
.and_then(|retweeted_user_id| users.get(&(retweeted_user_id as i64)));
let retweet_user = match retweet_user {
Some(Ok(Some(user))) => Ok(Some(user)),
Some(Ok(None)) | None => Ok(None),
Some(Err(err)) => Err(err.to_string()),
};
For each candidate, look up both users from the response map. Each lookup returns Option<Result<Option<User>, Err>>. Normalize to Result<Option<&User>, String> (missing entries flatten to Ok(None); per-user errors propagate).
let hydrated = match (user, retweet_user) {
(Ok(user), Ok(retweet_user)) => {
let user_counts = user.and_then(|user| user.user.as_ref().map(|u| &u.counts));
let user_profile = user.and_then(|user| user.user.as_ref().map(|u| &u.profile));
let author_followers_count: Option<i32> =
user_counts.map(|x| x.followers_count).map(|x| x as i32);
let author_screen_name: Option<String> =
user_profile.map(|x| x.screen_name.clone());
let retweet_profile =
retweet_user.and_then(|user| user.user.as_ref().map(|u| &u.profile));
let retweeted_screen_name: Option<String> =
retweet_profile.map(|x| x.screen_name.clone());
Ok(PostCandidate {
author_followers_count,
author_screen_name,
retweeted_screen_name,
..Default::default()
})
}
(Err(err), _) | (_, Err(err)) => Err(err),
};
hydrated_candidates.push(hydrated);
}
Cross-multiply: if either user lookup errored, return the error. Otherwise pull followers_count, screen_name, and the retweeted user's screen name (no followers count needed for retweeted user — we only care about the immediate author's reach).
u.user.as_ref().map(|u| &u.counts) — the Gizmoduck response wraps user data in an outer Option<User> (so we have Option<Option<User>> essentially — the outer was Some, the inner says whether the user actually exists in the DB).
The followers_count is converted i64 → i32. If a user has 2^31 followers we overflow, but that's millions of users and probably doesn't apply outside celebrity accounts. Tolerable precision loss.
update follows the usual pattern (three-field copy).
blocked_by_hydrator.rs (57 lines) — non-cached, network call
Calls the social graph service to check which authors block the viewer. Not cached: the result depends on the viewer (different cache per viewer would be impractical).
pub struct BlockedByHydrator {
pub socialgraph_client: Arc<dyn SocialGraphClientOps>,
}
impl BlockedByHydrator {
pub async fn new(socialgraph_client: Arc<dyn SocialGraphClientOps>) -> Self {
Self { socialgraph_client }
}
}
Simpler struct: just the client.
#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for BlockedByHydrator {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
!query.has_cached_posts
}
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let author_ids: Vec<u64> = candidates.iter().map(|x| x.author_id).collect();
let blocked_by_user_ids = match self
.socialgraph_client
.check_blocked_by(query.user_id, &author_ids)
.await
{
Ok(ids) => ids,
Err(e) => {
let err_msg = e.to_string();
return candidates.iter().map(|_| Err(err_msg.clone())).collect();
}
};
candidates
.iter()
.map(|candidate| {
let author_blocks_viewer = blocked_by_user_ids.contains(&candidate.author_id);
Ok(PostCandidate {
author_blocks_viewer: Some(author_blocks_viewer),
..Default::default()
})
})
.collect()
}
fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
candidate.author_blocks_viewer = hydrated.author_blocks_viewer;
}
}
Implements Hydrator directly (no cache).
One batch RPC call: check_blocked_by(viewer_id, [author_ids]) returns the subset of author_ids that block the viewer. Then for each candidate, check membership.
Whole-batch error handling: if the social graph call fails, every candidate gets the same error. That means a transient SG failure poisons the entire batch's author_blocks_viewer field — and AuthorSocialgraphFilter (which uses it) defaults .unwrap_or(false) on failures (Session 05), so an SG outage results in no posts being filtered for being blocked-by. Slight risk of showing posts you shouldn't see for the duration of the outage — but acceptable degraded UX vs. blocking the entire feed.
err_msg.clone() is needed because each Err variant takes ownership of the string. We can't share &err_msg across the iterator.
has_media_hydrator.rs (85 lines)
Calls TES to learn whether a post has media (image / video / GIF). Output: has_media: Option<bool>.
impl CachedHydrator<ScoredPostsQuery, PostCandidate> for HasMediaHydrator {
type CacheKey = u64;
type CacheValue = Option<bool>;
fn enable(&self, query: &ScoredPostsQuery) -> bool {
(query.params.get(EnableHasMediaHydration) || query.is_shadow_traffic)
&& !query.has_cached_posts
}
The enable gate has an extra condition: only run if the feature switch is on OR the request is shadow traffic. This is a dark-launched feature — production requests don't run it yet, but shadow-traffic requests do, so we can measure its impact before rolling out.
fn cache_key(&self, candidate: &PostCandidate) -> Self::CacheKey {
candidate.get_original_tweet_id()
}
Uses get_original_tweet_id() — not tweet_id. The original tweet is the retweeted one for retweets, the post itself otherwise. This is correct: a retweet's media is the original's media; cache by the right key for hit rate.
async fn hydrate_from_client(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let client = &self.tes_client;
let tweet_ids: Vec<u64> = candidates
.iter()
.map(|c| c.get_original_tweet_id())
.collect();
let has_media_results = client.get_has_media(tweet_ids.clone()).await;
…
The familiar batch RPC pattern. Get media flags for the original tweet IDs.
The rest is the standard "iterate results, handle three cases" loop. Same as we've seen — push Ok(partial) for hits, Err for missing or RPC error.
language_code_hydrator.rs (84 lines)
The language code of the tweet (English, Spanish, Japanese, etc.). Output: language_code: Option<String>.
impl CachedHydrator<ScoredPostsQuery, PostCandidate> for LanguageCodeHydrator {
type CacheKey = u64;
type CacheValue = Option<String>;
fn enable(&self, query: &ScoredPostsQuery) -> bool {
!query.has_cached_posts
}
fn cache_key(&self, candidate: &PostCandidate) -> Self::CacheKey {
candidate.get_original_tweet_id()
}
…
Same shape. Option<String> cache value. Uses get_original_tweet_id() (retweets have the same language as the original).
The full implementation is structurally identical to has_media_hydrator with language_code substituted. Skipping the body since it would be a near-verbatim copy.
video_duration_candidate_hydrator.rs (85 lines)
Video duration in ms. Output: min_video_duration_ms: Option<i32>.
impl CachedHydrator<ScoredPostsQuery, PostCandidate> for VideoDurationCandidateHydrator {
type CacheKey = u64;
type CacheValue = Option<i32>;
…
fn cache_key(&self, candidate: &PostCandidate) -> Self::CacheKey {
candidate.get_original_tweet_id()
}
…
async fn hydrate_from_client(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let client = &self.tes_client;
let tweet_ids: Vec<u64> = candidates
.iter()
.map(|c| c.get_original_tweet_id())
.collect();
let durations = client.get_min_video_durations(tweet_ids.clone()).await;
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for tweet_id in tweet_ids {
let hydrated = match durations.get(&tweet_id) {
Some(Ok(min_video_duration_ms)) => Ok(PostCandidate {
min_video_duration_ms: min_video_duration_ms.map(|v| v as i32),
..Default::default()
}),
None => Err(format!(
"Missing min video duration for tweet_id={}",
tweet_id
)),
Some(Err(err)) => Err(err.to_string()),
};
hydrated_candidates.push(hydrated);
}
hydrated_candidates
}
…
}
Same structure as the previous TES-backed hydrators. The TES API returns the duration as Option<i64> (presumably) and we cast as i32. Truncates if the duration is > 2^31 ms = ~24 days. Tweets aren't 24 days of video. Safe.
The "min" prefix: TES probably reports the minimum duration when a post has multiple media items (e.g., a gallery with two videos). We use the shorter one for downstream "is this a long video?" checks.
What we've learned
Hydrator anatomy: every hydrator is one of two shapes:
- Plain hydrator (
InNetworkCandidateHydrator,BlockedByHydrator): no cache, just call the service or compute from query state. - Cached hydrator (the other 6): implement
CachedHydrator, get cache integration for free via Session 01's blanket impl.
Cache backend: MokaCache — TinyLFU eviction, in-process, per-hydrator. Each hydrator has its own cache with default TTL + size.
Cache key choice:
tweet_idfor fields that don't differ by retweet (subscription, core data).original_tweet_idfor fields that are the same on retweets and originals (has_media, language_code, video_duration).- Composite struct
(author_id, retweeted_user_id)when the data depends on multiple user IDs (Gizmoduck). - No cache when the result depends on the viewer (blocked-by).
The has_cached_posts gate: every hydrator has enable = !query.has_cached_posts. When the cached-posts source has provided a fully-scored set of candidates from a prior request, we skip re-hydration. Speeds up follow-up requests dramatically.
Three-way result handling:
Some(Ok(Some(val)))→ hit.None | Some(Ok(None))→ either "not in response" (no return for this ID) or "explicitly absent" (e.g., tweet deleted). SometimesOk(default), sometimesErr— depends on the field's semantics.Some(Err(err))→ upstream error → propagate asErr.
Whole-batch error vs. per-item error: BlockedByHydrator returns Err for every candidate when the single RPC fails. CoreDataCandidateHydrator handles per-tweet errors. The difference: batched-but-not-keyed APIs vs. batched-keyed APIs.
Sources own author_id: CoreDataCandidateHydrator::update deliberately doesn't copy author_id. The source must set it; this hydrator fills the rest. If the source set author_id = 0, CoreDataHydrationFilter will drop the candidate.
u64 ↔ i64 casts everywhere: home-mixer uses u64 for IDs internally; many client APIs use i64 for historical reasons. Each hydrator that talks to a downstream service does the cast. Safe in practice (IDs are positive), but wasteful from a type-safety perspective.
Shadow-traffic-only features: has_media_hydrator runs only on shadow traffic (or when flag flipped on). Pattern: build a feature, ship the code, enable on shadow first, monitor metrics, ramp to production.
Dedup the fetch list: gizmoduck_hydrator does Vec::dedup() on its user-IDs-to-fetch list — best-effort (only consecutive duplicates). Acceptable since one duplicate adds a single small extra entry in the batch request.
Next session
Session 08 — Candidate hydrators, part 2. Covers the remaining 8 hydrators (~1,000 LOC):
engagement_counts_hydrator.rs(113) — fav/reply/repost/quote countsads_brand_safety_hydrator.rs(176) — brand-safety labels for ads pacingads_brand_safety_vf_hydrator.rs(108) — VF-side brand-safety labelstweet_type_metrics_hydrator.rs(180) — packed tweet-type metric blobquote_hydrator.rs(176) — quote-tweet metadatavf_candidate_hydrator.rs(162) — visibility-filtering service callmutual_follow_jaccard_hydrator.rs(118) — Jaccard similarity featurefiltered_topics_hydrator.rs(134) — topic classificationsfollowing_replied_users_hydrator.rs(98) — "people you follow replied" detection