X For You algorithm, line by line · Part 8
X For You algorithm, line by line — Part 8: Candidate hydrators (part 2 of 2)
Part 8 of the deep dive into xai-org/x-algorithm. The semantic candidate hydrators: engagement counts with tweet-age-based TTLs, two-arm A/B-tested brand safety, the packed-bitset tweet_type_metrics, quote-tweet expansion with parallel I/O, two-safety-level visibility filtering, MinHash Jaccard similarity, topic taxonomy lookups, and the facepile.
The second half of the 16 hydrators wired into the Phoenix pipeline. This batch is more semantically interesting — the previous batch was mostly the same "TES batch fetch + cache" pattern. These hydrators do real signal aggregation: engagement counts, brand safety verdicts, tweet-type feature bitsets, quote-tweet expansion, visibility filtering, MinHash similarity, and the topic taxonomy lookup.
Files covered (~1,265 LOC across 9 files):
home-mixer/candidate_hydrators/
├── engagement_counts_hydrator.rs (113)
├── ads_brand_safety_hydrator.rs (176)
├── ads_brand_safety_vf_hydrator.rs (108)
├── tweet_type_metrics_hydrator.rs (180)
├── quote_hydrator.rs (176)
├── vf_candidate_hydrator.rs (162)
├── mutual_follow_jaccard_hydrator.rs (118)
├── filtered_topics_hydrator.rs (134)
└── following_replied_users_hydrator.rs (98)
engagement_counts_hydrator.rs (113 lines)
Fetches fav_count, reply_count, repost_count, quote_count for each post. Used as features by the Phoenix scorer (counts as a signal of "this post is going viral").
#[derive(Clone, Debug)]
pub struct CachedCounts {
fav_count: Option<i64>,
reply_count: Option<i64>,
repost_count: Option<i64>,
quote_count: Option<i64>,
}
pub struct EngagementCountsHydrator {
pub tes_client: Arc<dyn TESClient + Send + Sync>,
cache: MokaCache<u64, CachedCounts>,
}
The first hydrator we've seen with a custom cache expiry policy:
impl EngagementCountsHydrator {
pub async fn new(tes_client: Arc<dyn TESClient + Send + Sync>) -> Self {
let cache = build_moka_cache_tweet_age(
1_000_000,
TweetAgeExpiry {
age_threshold: Duration::from_secs(30 * 60),
new_tweet_ttl: Duration::from_secs(5 * 60),
old_tweet_ttl: Duration::from_secs(10 * 60),
},
);
Self { tes_client, cache }
}
}
build_moka_cache_tweet_age — different TTL based on tweet age. New posts (< 30 min old): cache for 5 min. Older posts: cache for 10 min. The rationale: engagement counts on new posts change rapidly (a tweet with 5 likes one minute can have 50 the next), so cache short. Older posts plateau, so we can cache longer.
Two parameters: 1_000_000 is the cache capacity (1M entries); TweetAgeExpiry is the dual-TTL policy.
fn enable(&self, query: &ScoredPostsQuery) -> bool {
(query.params.get(EnableContextFeatures) || query.is_shadow_traffic)
&& !query.has_cached_posts
}
Feature-flag-gated + shadow traffic. Pattern we've seen before: dark-launch on shadow first, ramp up via flag.
The rest follows the standard cached-hydrator pattern. The interesting fields are pulled from counts_results:
tweet_ids
.iter()
.map(|tweet_id| {
let counts = counts_results
.get(tweet_id)
.and_then(|r| r.as_ref().ok())
.and_then(|opt| opt.as_ref());
Ok(PostCandidate {
fav_count: counts.and_then(|c| c.favorite_count),
reply_count: counts.and_then(|c| c.reply_count),
repost_count: counts.and_then(|c| c.retweet_count),
quote_count: counts.and_then(|c| c.quote_count),
..Default::default()
})
})
.collect()
Always Ok — even if the count fetch fails, we don't fail the candidate. Just leave the counts as None. The downstream Phoenix scorer treats None as "no signal," which is fine.
Note the field name mapping: favorite_count → fav_count, retweet_count → repost_count (X renamed "Retweet" to "Repost" in the UI but kept the backend name).
ads_brand_safety_hydrator.rs (176 lines)
The brand-safety verdict computation. Uses compute_verdict from Session 04's models/brand_safety.rs. Post-selection hydrator — only the top-K candidates get this expensive check.
#[derive(Clone, Default)]
pub struct CachedBrandSafety {
verdict: BrandSafetyVerdict,
safety_labels: Vec<SafetyLabelInfo>,
}
pub struct AdsBrandSafetyHydrator {
pub client: Arc<dyn SafetyLabelStoreClient>,
pub cache: MokaCache<u64, CachedBrandSafety>,
}
Cache entry is (verdict, labels). The labels list is needed for the wire response.
impl AdsBrandSafetyHydrator {
pub fn new(client: Arc<dyn SafetyLabelStoreClient>) -> Self {
let cache = build_moka_cache_tweet_age(
CACHE_SIZE,
TweetAgeExpiry {
age_threshold: Duration::from_secs(5 * 60),
new_tweet_ttl: Duration::from_secs(60),
old_tweet_ttl: Duration::from_secs(60 * 60),
},
);
Self { client, cache }
}
}
Aggressive TTL for new tweets: 60s. Brand-safety labels can be added (a previously-OK post gets flagged as NSFW), and we want to pick that up quickly. Older posts (5+ min): 1h cache. Once a post is "settled," its labels don't change much.
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.params.get(EnableAdsBrandSafetyHydrator)
&& !query
.decider
.as_ref()
.is_some_and(|d| d.enabled("vf_brand_safety_dark_traffic"))
}
Exclusive-or with the VF brand-safety hydrator below: the hydrator runs only if the decider is NOT enabled for the dark-traffic experiment. The companion AdsBrandSafetyVfHydrator runs only if it IS enabled. So exactly one of the two runs per request — they're A/B test arms, not both-at-once.
async fn hydrate_from_client(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let mut all_ids: HashSet<u64> = HashSet::new();
for c in candidates {
all_ids.insert(c.retweeted_tweet_id.unwrap_or(c.tweet_id));
if let Some(qt_id) = c.quoted_tweet_id {
all_ids.insert(qt_id);
}
}
Collect IDs to fetch:
- The "primary" ID — for retweets, the original; otherwise the post itself.
- The quoted tweet ID, if any.
HashSet to dedup (multiple candidates might quote the same tweet).
let all_ids_vec: Vec<u64> = all_ids.into_iter().collect();
let all_ids_i64: Vec<i64> = all_ids_vec.iter().map(|&id| id as i64).collect();
let mut label_map: HashMap<u64, xai_safety_label_store::types::SafetyLabelMap> =
HashMap::new();
let mut error_map: HashMap<u64, String> = HashMap::new();
match self.client.batch_get_all_labels(&all_ids_i64).await {
Ok(per_key_results) => {
for (&id, result) in all_ids_vec.iter().zip(per_key_results) {
match result {
Ok(labels) => {
label_map.insert(id, labels);
}
Err(e) => {
error_map.insert(id, e.to_string());
}
}
}
}
Err(e) => {
let err_str = e.to_string();
for &id in &all_ids_vec {
error_map.insert(id, err_str.clone());
}
}
}
Batch RPC, then partition the response into label_map (per-id labels) and error_map (per-id failures). On total batch failure (outer Err), every ID gets the same error.
The need for two maps rather than HashMap<u64, Result<Labels, Err>>: cleaner downstream code — for each candidate, we look up labels in label_map and check errors in error_map separately.
candidates
.iter()
.map(|c| {
let primary_id = c.retweeted_tweet_id.unwrap_or(c.tweet_id);
if let Some(err) = error_map.get(&primary_id) {
return Err(format!("safety label lookup error for tweet {primary_id}: {err}"));
}
let empty = HashMap::new();
let primary_labels = label_map.get(&primary_id).unwrap_or(&empty);
let mut verdict = compute_verdict(primary_labels, primary_id);
let mut safety_labels: Vec<SafetyLabelInfo> = primary_labels
.iter()
.map(|(k, v)| SafetyLabelInfo {
label_type: *k,
description: v.source.as_deref().map(truncate_description),
source: botmaker_rule_id_from(v)
.map(|id| botmaker_rule_category(id).to_string()),
})
.collect();
For each candidate:
- If the primary tweet's labels couldn't be fetched, error out for this candidate.
- Otherwise compute the verdict (Session 04's
compute_verdict). - Build a
SafetyLabelInfolist with truncated descriptions and BotMaker rule categories.
.as_deref() converts Option<String> to Option<&str>. Then .map(truncate_description) produces Option<String> (truncated to 250 chars).
if let Some(qt_id) = c.quoted_tweet_id {
if error_map.contains_key(&qt_id) {
verdict = worst_verdict(&verdict, &BrandSafetyVerdict::MediumRisk);
} else {
let qt_labels = label_map.get(&qt_id).unwrap_or(&empty);
verdict = worst_verdict(&verdict, &compute_verdict(qt_labels, qt_id));
safety_labels.extend(qt_labels.iter().map(|(k, v)| {
SafetyLabelInfo {
label_type: *k,
description: v.source.as_deref().map(truncate_description),
source: botmaker_rule_id_from(v)
.map(|id| botmaker_rule_category(id).to_string()),
}
}));
}
}
For quote tweets, combine the verdicts pessimistically using worst_verdict (from Session 04). If the quoted tweet's labels couldn't be fetched, assume MediumRisk (conservative default). Then append the quoted tweet's labels to the candidate's label list.
This is transitive safety: a tweet quoting an unsafe tweet inherits the unsafe verdict.
safety_labels.sort_unstable_by_key(|l| i32::from(l.label_type));
safety_labels.dedup_by(|a, b| a.label_type == b.label_type);
Sort + dedup the labels (the candidate and its quoted tweet might share label types).
Ok(PostCandidate {
brand_safety_verdict: Some(verdict),
safety_labels,
..Default::default()
})
})
.collect()
}
Build the partial candidate.
ads_brand_safety_vf_hydrator.rs (108 lines)
The A/B-test sibling of AdsBrandSafetyHydrator. Uses the VF safety-labels client instead of the safety label store. Otherwise nearly identical logic.
pub struct AdsBrandSafetyVfHydrator {
pub client: Arc<dyn VfClient>,
}
#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for AdsBrandSafetyVfHydrator {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.params.get(EnableAdsBrandSafetyHydrator)
&& query
.decider
.as_ref()
.is_some_and(|d| d.enabled("vf_brand_safety_dark_traffic"))
}
Mirror gate: enabled only when the dark-traffic decider IS on. So this is the experiment-arm hydrator that exists to test VF as the brand-safety source.
It's a plain Hydrator (not CachedHydrator) — no caching, because the VF client probably already has its own cache layer.
async fn hydrate(
&self,
_query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let mut all_ids: HashSet<u64> = HashSet::new();
for c in candidates {
all_ids.insert(c.retweeted_tweet_id.unwrap_or(c.tweet_id));
if let Some(qt_id) = c.quoted_tweet_id {
all_ids.insert(qt_id);
}
}
let tweet_ids: Vec<u64> = all_ids.into_iter().collect();
let batch = match self.client.get_safety_labels(tweet_ids).await {
Ok(batch) => batch,
Err(e) => {
let err = format!("VF get_safety_labels failed: {e}");
return candidates.iter().map(|_| Err(err.clone())).collect();
}
};
let failed_ids: HashSet<u64> = batch.failures.keys().copied().collect();
let label_map = batch.labels;
…
Same ID-collection logic. Then get_safety_labels returns a batch with both successful labels (batch.labels) and per-id failures (batch.failures). Partition them.
On total RPC failure, return per-candidate error (same as the previous hydrator).
The rest of the function is near-identical to AdsBrandSafetyHydrator — the same per-candidate logic with the same verdict + quote-tweet inheritance + label aggregation. Skipping the rest since it would be a duplicate read.
The two hydrators differ only in:
- Which service they call.
- The shape of the response (one vs. two maps).
- The
enablegate (mirror conditions).
This is A/B-test scaffolding: keep both code paths live, switch between them via decider. When the experiment is over and we pick the winner, delete the other one.
tweet_type_metrics_hydrator.rs (180 lines) — packed feature bitset
A pure-compute hydrator that builds a bitset encoding which categorical buckets the candidate falls into. The bitset is later serialized to bytes and shipped in the wire response.
const THIRTY_MINUTES_MS: u64 = 30 * 60 * 1000;
const ONE_HOUR_MS: u64 = 60 * 60 * 1000;
const SIX_HOURS_MS: u64 = 6 * 60 * 60 * 1000;
const TWELVE_HOURS_MS: u64 = 12 * 60 * 60 * 1000;
const TWENTY_FOUR_HOURS_MS: u64 = 24 * 60 * 60 * 1000;
pub struct TweetTypeMetricsHydrator;
Time thresholds for age buckets. No state — unit struct.
pub fn create_tweet_type_bitset(
candidate: &PostCandidate,
query: &ScoredPostsQuery,
) -> HashSet<usize> {
let mut true_tweet_types = HashSet::new();
true_tweet_types.insert(ANY_CANDIDATE);
if candidate.retweeted_tweet_id.is_some() {
true_tweet_types.insert(RETWEET);
}
if candidate.in_reply_to_tweet_id.is_some() {
true_tweet_types.insert(REPLY);
}
if candidate.subscription_author_id.is_some() {
true_tweet_types.insert(SUBSCRIPTION_POST);
}
if let Some(score) = candidate.score
&& score != 0.0
{
true_tweet_types.insert(FULL_SCORING_SUCCEEDED);
}
if !candidate.ancestors.is_empty() {
true_tweet_types.insert(HAS_ANCESTORS);
}
if candidate.in_network.unwrap_or(true) {
true_tweet_types.insert(IN_NETWORK);
}
Set bits in a HashSet<usize> for each category that applies. The constants (ANY_CANDIDATE, RETWEET, etc.) are from util::tweet_type_metrics::* — bit indices.
Each bit represents a categorical feature for downstream analytics: "this candidate was a retweet," "this candidate is a reply," etc. The set is then used to:
- Emit stats (counter per bit).
- Encode in the wire response as a byte array.
Note in_network.unwrap_or(true): defaults to "in network" if the field isn't set. Slightly surprising (most defaults are false); presumably because if InNetworkCandidateHydrator didn't run, we don't want to mass-label candidates as OON.
if let Some(followers) = candidate.author_followers_count {
let followers_u32 = followers as u32;
if followers_u32 < 100 {
true_tweet_types.insert(AUTHOR_FOLLOWERS_0_100);
}
if (100..1000).contains(&followers_u32) {
true_tweet_types.insert(AUTHOR_FOLLOWERS_100_1K);
}
if (1000..10000).contains(&followers_u32) {
true_tweet_types.insert(AUTHOR_FOLLOWERS_1K_10K);
}
if (10000..100000).contains(&followers_u32) {
true_tweet_types.insert(AUTHOR_FOLLOWERS_10K_100K);
}
if (100000..1000000).contains(&followers_u32) {
true_tweet_types.insert(AUTHOR_FOLLOWERS_100K_1M);
}
if followers_u32 >= 1000000 {
true_tweet_types.insert(AUTHOR_FOLLOWERS_1M_PLUS);
}
}
Bucketize author follower count into log-spaced buckets: 0-100, 100-1K, 1K-10K, 10K-100K, 100K-1M, 1M+. Exactly one of these is set (the others are not — the if checks are mutually exclusive but listed flat for readability).
This is how you'd analyze "are we showing more from large accounts?" — bucket counts give a histogram.
if candidate.min_video_duration_ms.is_some() {
true_tweet_types.insert(VIDEO);
}
if let Some(duration_ms) = candidate.min_video_duration_ms {
let duration_ms_u32 = duration_ms as u32;
if duration_ms_u32 <= 10000 {
true_tweet_types.insert(VIDEO_LTE_10_SEC);
}
if duration_ms_u32 > 10000 && duration_ms_u32 <= 60000 {
true_tweet_types.insert(VIDEO_BT_10_60_SEC);
}
if duration_ms_u32 > 60000 {
true_tweet_types.insert(VIDEO_GT_60_SEC);
}
}
Same pattern for video durations. The VIDEO bit is set if any video exists; then three sub-buckets (short, medium, long).
if let Some(age) = duration_since_creation_opt(candidate.tweet_id) {
let age_ms = age.as_millis() as u64;
if age_ms <= THIRTY_MINUTES_MS {
true_tweet_types.insert(TWEET_AGE_LTE_30_MINUTES);
}
if age_ms <= ONE_HOUR_MS {
true_tweet_types.insert(TWEET_AGE_LTE_1_HOUR);
}
if age_ms <= SIX_HOURS_MS {
true_tweet_types.insert(TWEET_AGE_LTE_6_HOURS);
}
if age_ms <= TWELVE_HOURS_MS {
true_tweet_types.insert(TWEET_AGE_LTE_12_HOURS);
}
if age_ms >= TWENTY_FOUR_HOURS_MS {
true_tweet_types.insert(TWEET_AGE_GTE_24_HOURS);
}
}
Age buckets. These are cumulative — a 25-min-old post sets all of LTE_30_MINUTES, LTE_1_HOUR, LTE_6_HOURS, LTE_12_HOURS (and not GTE_24_HOURS). Different from the follower buckets which were exclusive. Makes "fraction of posts ≤ 1h old" easy to compute as a single counter.
let served_size = query.served_ids.len();
if served_size == 0 {
true_tweet_types.insert(EMPTY_REQUEST);
}
if served_size < 3 {
true_tweet_types.insert(NEAR_EMPTY);
}
if served_size < 20 {
true_tweet_types.insert(SERVED_SIZE_LESS_THAN_20);
}
if served_size < 10 {
true_tweet_types.insert(SERVED_SIZE_LESS_THAN_10);
}
if served_size < 5 {
true_tweet_types.insert(SERVED_SIZE_LESS_THAN_5);
}
true_tweet_types
}
Per-request context buckets. Same on every candidate in a request (only depends on query.served_ids.len()). Useful for slicing metrics by "how deep is the user scrolling."
pub fn bitset_to_bytes(bits: &HashSet<usize>) -> Vec<u8> {
if bits.is_empty() {
return Vec::new();
}
let max_bit = bits.iter().max().copied().unwrap_or(0);
let num_bytes = (max_bit / 8) + 1;
let mut bytes = vec![0u8; num_bytes];
for &bit_index in bits {
let byte_index = bit_index / 8;
let bit_offset = bit_index % 8;
bytes[byte_index] |= 1u8 << bit_offset;
}
bytes
}
}
Pack the HashSet<usize> into a byte array. Find the max bit, allocate enough bytes, set each bit.
max_bit / 8 + 1 rounds up to whole bytes. The bit at index bit_index lives in bytes[bit_index / 8] at offset bit_index % 8. Standard bitmap encoding.
The output Vec<u8> is what gets stored in candidate.tweet_type_metrics for the wire response.
#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for TweetTypeMetricsHydrator {
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for candidate in candidates {
let true_tweet_types = Self::create_tweet_type_bitset(candidate, query);
let tweet_type_metrics = Some(Self::bitset_to_bytes(&true_tweet_types));
let hydrated = PostCandidate {
tweet_type_metrics,
..Default::default()
};
hydrated_candidates.push(Ok(hydrated));
}
hydrated_candidates
}
For each candidate, build the bitset and pack it. No I/O — pure compute.
This is a post-selection hydrator (we saw it in Session 06's pipeline list). Runs only on the top-K candidates because the bitset is only valuable for ones we'll actually serve.
quote_hydrator.rs (176 lines) — quote-tweet expansion
Looks up the quoted tweet's metadata: tweet ID, author ID, whether the quoted author blocks the viewer, and (optionally) the quoted tweet's video duration.
Notable: not a CachedHydrator — it has its own custom caching logic interleaved with the I/O. Plus a tokio::join! for parallel fetching.
pub struct QuoteHydrator {
pub tes_client: Arc<dyn TESClient + Send + Sync>,
pub socialgraph_client: Arc<dyn SocialGraphClientOps>,
pub cache: MokaCache<u64, QuoteCacheValue>,
}
#[derive(Clone, Debug)]
pub struct QuoteCacheValue {
pub quoted_tweet_id: Option<u64>,
pub quoted_user_id: Option<u64>,
}
The cache stores (quoted_tweet_id, quoted_user_id). The block-check is NOT cached (depends on viewer) and the video-duration call is on the quoted tweet ID (would hit a separate cache via TES). Only the structural relationship cached.
async fn get_quoted_video_durations(
&self,
quoted_tweet_ids: Vec<u64>,
) -> HashMap<u64, Option<i32>> {
if quoted_tweet_ids.is_empty() {
return HashMap::new();
}
let result = tokio::time::timeout(
std::time::Duration::from_millis(200),
self.tes_client.get_min_video_durations(quoted_tweet_ids),
)
.await;
match result {
Ok(durations) => durations
.into_iter()
.filter_map(|(id, result)| result.ok().map(|d| (id, d.map(|v| v as i32))))
.collect(),
Err(_) => HashMap::new(),
}
}
Helper with 200ms timeout. The video-duration call is optional — better to skip it than block the whole pipeline if TES is slow.
filter_map(|(id, result)| result.ok().map(|d| (id, d.map(|v| v as i32)))):
result.ok()— keep only successful per-id results..map(|d| (id, d.map(|v| v as i32)))— converti64 → i32inside the option.
Returns an empty map on timeout. Downstream treats None as "no video duration."
async fn get_blocked_by(&self, viewer_id: u64, quoted_user_ids: Vec<u64>) -> HashSet<u64> {
if quoted_user_ids.is_empty() {
return HashSet::new();
}
self.socialgraph_client
.check_blocked_by(viewer_id, "ed_user_ids)
.await
.unwrap_or_default()
}
Calls the social graph to check which quoted authors block the viewer. unwrap_or_default() — on any error, return empty set (treat as "no one blocks you"). Permissive, but safe-ish: false positives on the safe side.
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let tweet_ids: Vec<u64> = candidates.iter().map(|c| c.tweet_id).collect();
let mut cache_misses: Vec<u64> = Vec::new();
let mut resolved: Vec<(u64, Option<u64>, Option<u64>)> =
Vec::with_capacity(tweet_ids.len());
for &tweet_id in &tweet_ids {
if let Some(cached) = self.cache.get(&tweet_id).await {
resolved.push((tweet_id, cached.quoted_tweet_id, cached.quoted_user_id));
} else {
cache_misses.push(tweet_id);
resolved.push((tweet_id, None, None));
}
}
Manual cache traversal. Iterate candidates, look up each in cache. Cache hits go directly into resolved; misses are placeholders for now.
This is essentially what the CachedHydrator blanket impl does, but inlined here because the post-cache flow needs more than just "fetch the missing ones" — it needs to also fetch (block-by, video-duration) in parallel.
if !cache_misses.is_empty() {
let quoted_tweets = self
.tes_client
.get_quoted_tweets(cache_misses.clone())
.await;
for entry in resolved.iter_mut() {
let tweet_id = entry.0;
if !cache_misses.contains(&tweet_id) {
continue;
}
let (qt_tweet_id, qt_user_id) = match quoted_tweets.get(&tweet_id) {
Some(Ok(Some(qt))) => (Some(qt.tweet_id), Some(qt.user_id)),
_ => (None, None),
};
entry.1 = qt_tweet_id;
entry.2 = qt_user_id;
self.cache
.insert(
tweet_id,
QuoteCacheValue {
quoted_tweet_id: qt_tweet_id,
quoted_user_id: qt_user_id,
},
)
.await;
}
}
Fill in the cache misses via TES, update the resolved tuples in place, write to cache.
Note cache_misses.contains(&tweet_id) — O(n) linear scan. Could be O(1) with a HashSet but the misses list is typically small.
let quoted_user_ids: Vec<u64> = resolved
.iter()
.filter_map(|(_, _, uid)| *uid)
.collect::<HashSet<u64>>()
.into_iter()
.collect();
let fetch_quoted_duration = query.params.get(EnableQuotedVqvDurationCheck);
let quoted_tweet_ids: Vec<u64> = if fetch_quoted_duration {
resolved
.iter()
.filter_map(|(_, qt_id, _)| *qt_id)
.collect::<HashSet<u64>>()
.into_iter()
.collect()
} else {
Vec::new()
};
Collect IDs for the two follow-up fetches:
- Quoted user IDs (dedup via HashSet).
- Quoted tweet IDs (only if the feature flag is on — duration check is opt-in).
let (blocked_by, quoted_durations) = tokio::join!(
self.get_blocked_by(query.user_id, quoted_user_ids),
self.get_quoted_video_durations(quoted_tweet_ids),
);
Two parallel fetches. The block check and duration check run concurrently. Saves a round trip vs serial.
resolved
.iter()
.map(|(_, qt_tweet_id, qt_user_id)| {
let quoted_author_blocks_viewer = qt_user_id
.map(|uid| blocked_by.contains(&uid))
.unwrap_or(false);
let quoted_video_duration_ms = qt_tweet_id
.and_then(|id| quoted_durations.get(&id).copied())
.flatten();
Ok(PostCandidate {
quoted_tweet_id: *qt_tweet_id,
quoted_user_id: *qt_user_id,
quoted_author_blocks_viewer: Some(quoted_author_blocks_viewer),
quoted_video_duration_ms,
..Default::default()
})
})
.collect()
Combine everything per-candidate. The qt_tweet_id.and_then(|id| quoted_durations.get(&id).copied()).flatten() is a triple-option unwrap chain: get the option of (option of duration) → flatten to single option.
vf_candidate_hydrator.rs (162 lines) — visibility filtering
The big VF (Visibility Filtering) call. Splits candidates into in-network and OON, runs two parallel VF queries at different safety levels, and joins results.
pub struct VFCandidateHydrator {
pub vf_client: Arc<dyn VisibilityFilteringClient + Send + Sync>,
}
async fn fetch_vf_results(
client: &Arc<dyn VisibilityFilteringClient + Send + Sync>,
tweet_ids: Vec<u64>,
safety_level: SafetyLevel,
for_user_id: u64,
context: Option<TwitterContextViewer>,
) -> HashMap<u64, Result<Option<FilteredReason>>> {
if tweet_ids.is_empty() {
return HashMap::new();
}
client
.get_result(tweet_ids, safety_level, for_user_id, context)
.await
}
Helper that calls VF for a list of tweet IDs at a given safety level. Returns per-id results. Early-return empty map for empty input.
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let context = query.get_viewer();
let user_id = query.user_id;
let client = &self.vf_client;
let mut in_network_ids: Vec<u64> = Vec::new();
let mut oon_ids: Vec<u64> = Vec::new();
for candidate in candidates.iter() {
if candidate.in_network.unwrap_or(false) {
in_network_ids.push(candidate.tweet_id);
} else {
oon_ids.push(candidate.tweet_id);
}
for &ancestor_id in &candidate.ancestors {
oon_ids.push(ancestor_id);
}
if let Some(quoted_id) = candidate.quoted_tweet_id {
oon_ids.push(quoted_id);
}
if let Some(retweeted_id) = candidate.retweeted_tweet_id {
in_network_ids.push(retweeted_id);
}
}
oon_ids.sort_unstable();
oon_ids.dedup();
Partition tweet IDs by safety level:
- In-network: the candidate is in-network OR it's a retweet (the retweeted tweet inherits in-network status).
- OON: everything else, plus all ancestors and quoted tweets.
Why different safety levels? VF has stricter rules for OON content (Recommendations level) than for follows (TimelineHome level). Posts from people you follow get more lenient safety treatment than algorithmic recommendations — the rationale is that you opted into following someone, so we trust their content more.
Sort+dedup OON IDs (multiple candidates might share an ancestor). In-network IDs are not deduped — slight inefficiency.
let in_network_future = Self::fetch_vf_results(
client,
in_network_ids,
TimelineHome,
user_id,
context.clone(),
);
let oon_future = Self::fetch_vf_results(
client,
oon_ids,
TimelineHomeRecommendations,
user_id,
context,
);
let (in_network_result, oon_result) = join(in_network_future, oon_future).await;
let mut all_results: HashMap<u64, Result<Option<FilteredReason>>> = HashMap::new();
all_results.extend(in_network_result);
all_results.extend(oon_result);
Two parallel VF calls. futures::future::join here (note: not tokio::join!) — for exactly two futures, both work; join is from futures and returns a Future you await.
Merge the results into one all_results map.
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for candidate in candidates {
let primary_result = all_results.get(&candidate.tweet_id);
let visibility_reason = match primary_result {
Some(Ok(Some(reason))) => Some(reason.clone()),
_ => None,
};
let drop_ancillary = should_drop_ancillary(candidate, &all_results);
let hydrated = match primary_result {
Some(Err(err)) => Err(err.to_string()),
_ => Ok(PostCandidate {
visibility_reason,
drop_ancillary_posts: Some(drop_ancillary),
..Default::default()
}),
};
hydrated_candidates.push(hydrated);
}
hydrated_candidates
}
Per-candidate:
- Look up the primary tweet's VF result. If there's a
FilteredReason, that's the candidate'svisibility_reason. - Compute
drop_ancillarybased on whether any related tweet (ancestor, quoted, retweeted) should be dropped. - If the primary lookup errored → return Err for this candidate. Otherwise → Ok.
fn should_drop_ancillary(
candidate: &PostCandidate,
vf_results: &HashMap<u64, Result<Option<FilteredReason>>>,
) -> bool {
for &ancestor_id in &candidate.ancestors {
if let Some(Ok(Some(reason))) = vf_results.get(&ancestor_id)
&& should_drop_reason(reason)
{
return true;
}
}
if let Some(quoted_id) = candidate.quoted_tweet_id
&& let Some(Ok(Some(reason))) = vf_results.get("ed_id)
&& should_drop_reason(reason)
{
return true;
}
if let Some(retweeted_id) = candidate.retweeted_tweet_id
&& let Some(Ok(Some(reason))) = vf_results.get(&retweeted_id)
&& should_drop_reason(reason)
{
return true;
}
false
}
Check three relationships for ancillary-drop conditions:
- Any ancestor with
Dropaction. - The quoted tweet, if any, with
Drop. - The retweeted tweet, if any, with
Drop.
If any of those return drop, set drop_ancillary_posts = true. The downstream AncillaryVFFilter will drop the candidate.
if let ... && ... && ... — let chains, multiple boolean conditions in one if. Modern Rust syntax.
fn should_drop_reason(reason: &FilteredReason) -> bool {
match reason {
FilteredReason::SafetyResult(safety_result) => {
matches!(safety_result.action, Action::Drop(_))
}
_ => true,
}
}
Same decision logic as VFFilter from Session 05: Action::Drop for safety results, drop for any other reason type.
mutual_follow_jaccard_hydrator.rs (118 lines) — MinHash similarity
Computes the MinHash Jaccard similarity between the viewer's mutual-follow set and each author's mutual-follow set. A high value means "this author has lots of mutual follows with people you follow" — a strong social-graph relevance signal.
const MIN_HASHES: usize = 256;
The MinHash signature length. 256 hashes gives ~6% standard error on Jaccard estimation. Good tradeoff.
fn jaccard_from_minhash(a: &[i64], b: &[i64]) -> f64 {
let len = a.len().min(b.len());
if len == 0 {
return 0.0;
}
let matching = a.iter().zip(b.iter()).filter(|(x, y)| x == y).count();
matching as f64 / len as f64
}
The MinHash Jaccard estimator. Given two equal-length signatures (or the shorter length), count matching positions. Estimated Jaccard = matches / total. O(N) compare, no set arithmetic — that's the MinHash advantage.
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.params.get(EnableMutualFollowJaccardHydration) && query.viewer_minhash.is_some()
}
Feature-flag-gated + requires viewer's MinHash to be populated (by a query hydrator earlier).
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let viewer_minhash = match &query.viewer_minhash {
Some(mh) if mh.len() >= MIN_HASHES => mh,
_ => {
return candidates
.iter()
.map(|_| {
Ok(PostCandidate {
mutual_follow_jaccard: None,
..Default::default()
})
})
.collect();
}
};
Validate viewer MinHash length. If the viewer doesn't have enough hashes (e.g., new user with sparse signature), return None for every candidate — graceful no-op.
let unique_author_ids: Vec<i64> = candidates
.iter()
.map(|c| c.author_id as i64)
.collect::<HashSet<_>>()
.into_iter()
.collect();
let results = self
.strato_client
.batch_get_minhash_with_count(&unique_author_ids)
.await;
Dedup author IDs via HashSet, then batch-fetch all author MinHashes from Strato.
The _count in the response name suggests Strato returns a (minhash, follower_count) tuple. Probably for downstream use — but here we ignore the count.
let mut author_result: HashMap<i64, Result<Option<Vec<i64>>, String>> = HashMap::new();
for (uid, result) in unique_author_ids.iter().zip(results) {
match result {
Ok(Some((minhash, _count))) if minhash.len() >= MIN_HASHES => {
author_result.insert(*uid, Ok(Some(minhash)));
}
Ok(Some((minhash, _))) => {
author_result.insert(
*uid,
Err(format!(
"Invalid minhash length {} (need >= {}) for author_id={}",
minhash.len(),
MIN_HASHES,
uid,
)),
);
}
Ok(None) => {
author_result.insert(*uid, Ok(None));
}
Err(e) => {
author_result.insert(*uid, Err(e.to_string()));
}
}
}
Per-author result handling:
- Valid MinHash (≥256 hashes):
Ok(Some(mh)). - MinHash too short:
Err(invalid). - No MinHash in store:
Ok(None)— author doesn't have one, returnNonejaccard. - RPC error:
Err.
The "min hash too short" branch is interesting: it's not just absence — it's actively rejected. Probably because partial MinHashes give wildly inaccurate Jaccard estimates.
candidates
.iter()
.map(|c| {
let author_id = c.author_id as i64;
match author_result.get(&author_id) {
Some(Ok(Some(author_mh))) => Ok(PostCandidate {
mutual_follow_jaccard: Some(jaccard_from_minhash(
viewer_minhash,
author_mh,
)),
..Default::default()
}),
Some(Ok(None)) => Ok(PostCandidate {
mutual_follow_jaccard: None,
..Default::default()
}),
Some(Err(err)) => Err(err.clone()),
None => Err(format!(
"Missing minhash fetch result for author_id={}",
author_id,
)),
}
})
.collect()
Per-candidate: compute Jaccard or propagate the appropriate state.
The Jaccard goes onto mutual_follow_jaccard: Option<f64> — used downstream as a feature in scoring.
filtered_topics_hydrator.rs (134 lines) — topic taxonomy lookup
Calls Strato to fetch the per-post topic IDs under the active filtering experiment. Used by TopicIdsFilter and NewUserTopicIdsFilter.
fn decode_topics_pair(
result: &Result<Vec<u8>, Box<dyn std::error::Error>>,
experiment: TopicFilteringExperiment,
need_unfiltered: bool,
) -> (Option<Vec<i64>>, Option<Vec<i64>>) {
match result {
Ok(bytes) if !bytes.is_empty() => {
let decoded: StratoResult<StratoValue<FilteredTopicsByExperiment>> = decode(bytes);
match decoded {
StratoResult::Ok(v) => {
let ft = v.v;
let exp_topics = ft
.as_ref()
.and_then(|ft| ft.topic_ids_for_experiment(experiment).cloned());
let unf_topics = if need_unfiltered {
ft.as_ref().and_then(|ft| {
ft.topic_ids_for_experiment(TopicFilteringExperiment::Unfiltered)
.cloned()
})
} else {
None
};
(exp_topics, unf_topics)
}
StratoResult::Err(_) => (None, None),
}
}
Ok(_) => (None, None),
Err(e) => {
warn!("FilteredTopicsHydrator: strato fetch error: {}", e);
(None, None)
}
}
}
Helper that decodes a Strato raw-bytes response into (filtered_topic_ids, unfiltered_topic_ids). The Strato value is a FilteredTopicsByExperiment (from Session 04) which holds topic lists per experiment.
For each result:
- Decode bytes → struct.
- Extract topics for the requested
experiment. - Optionally also extract
Unfilteredtopics (needed for path-2 supertopic matching from Session 05'sTopicIdsFilter).
On any failure (empty bytes, decode error, RPC error) — return (None, None).
pub struct FilteredTopicsHydrator {
pub strato_client: Arc<dyn StratoClient + Send + Sync>,
}
#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for FilteredTopicsHydrator {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.is_topic_request()
|| query.has_excluded_topics()
|| (query.params.get(EnableNewUserTopicFiltering) && query.has_new_user_topic_ids())
}
Enable only when topic filtering will actually be applied. No reason to fetch topic info if we're not going to use it.
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let experiment = if query.is_bulk_topic_request() || query.has_excluded_topics() {
TopicFilteringExperiment::Unfiltered
} else {
let default_experiment =
TopicFilteringExperiment::parse(&query.params.get(TopicFilteringId));
let override_map =
TopicFilteringOverrideMap::parse(&query.params.get(TopicFilteringOverrides));
override_map.resolve(&query.topic_ids, default_experiment)
};
Pick the experiment to use:
- Bulk topic requests (≥7 topics) or excluded-topics requests → always
Unfiltered. - Otherwise, start with the default experiment from feature switches, then apply per-topic overrides (the parser we read in Session 05).
TopicFilteringOverrideMap::resolve looks up the first topic in the query that has an override.
let client = &self.strato_client;
let need_unfiltered = experiment != TopicFilteringExperiment::Unfiltered;
let mut all_ids: Vec<u64> = candidates.iter().map(|c| c.tweet_id).collect();
let retweet_offset = all_ids.len();
for c in candidates {
if let Some(rt_id) = c.retweeted_tweet_id {
all_ids.push(rt_id);
}
}
need_unfiltered: only fetch unfiltered topics if the experiment isn't already unfiltered (avoid duplicate work).
Build the fetch list: all primary tweet IDs first, then retweeted tweet IDs at the end. Keep track of where retweets start via retweet_offset.
let all_results = client
.batch_get_filtered_topics_by_experiment(&all_ids)
.await;
let mut retweet_topics: HashMap<u64, Vec<i64>> = HashMap::new();
let mut retweet_unfiltered: HashMap<u64, Vec<i64>> = HashMap::new();
let mut rt_idx = retweet_offset;
for c in candidates {
if let Some(rt_id) = c.retweeted_tweet_id {
let (exp, unf) =
decode_topics_pair(&all_results[rt_idx], experiment, need_unfiltered);
if let Some(topics) = exp {
retweet_topics.insert(rt_id, topics);
}
if let Some(topics) = unf {
retweet_unfiltered.insert(rt_id, topics);
}
rt_idx += 1;
}
}
Process the retweet results first into a separate map. For retweets, we want the original tweet's topics, not the retweet's (retweets have no independent topic classification).
candidates
.iter()
.enumerate()
.map(|(i, c)| {
let (topics, unf_topics) = if let Some(rt_id) = c.retweeted_tweet_id {
(
retweet_topics.get(&rt_id).cloned(),
retweet_unfiltered.get(&rt_id).cloned(),
)
} else {
decode_topics_pair(&all_results[i], experiment, need_unfiltered)
};
Ok(PostCandidate {
filtered_topic_ids: topics,
unfiltered_topic_ids: unf_topics,
..Default::default()
})
})
.collect()
}
Per-candidate: if a retweet, look up the retweeted tweet's topics from the map. Otherwise, decode the result at the candidate's index.
The result[i] indexing works because the candidates are processed in the same order they were added to all_ids (positions 0..candidates.len()).
following_replied_users_hydrator.rs (98 lines) — the facepile
Populates the facepile: the list of users-you-follow who replied to a post. Shown in the UI as "Alice and 3 others replied."
const VIEWER_FOLLOWERS_THRESHOLD: i64 = 1000;
A threshold: only run for users with ≥ 1000 followers themselves. Why? The facepile feature is most valuable for discovery from non-tight networks — a user with 1000+ followers has enough graph density that "people you follow replied to this" is a meaningful signal. For smaller networks the data is too sparse.
fn build_reply_author_map(replies: &[InNetworkReply]) -> HashMap<u64, Vec<u64>> {
let mut map: HashMap<u64, Vec<u64>> = HashMap::new();
for reply in replies {
map.entry(reply.in_reply_to_tweet_id)
.or_default()
.push(reply.author_id);
}
for authors in map.values_mut() {
authors.sort_unstable();
authors.dedup();
}
map
}
Helper: build a HashMap<replied_to_tweet_id, Vec<author_id>> from the precomputed InNetworkReplies list. Sort+dedup per entry — the same author might have replied multiple times.
fn enable(&self, query: &ScoredPostsQuery) -> bool {
let has_enough_followers = query
.user_features
.follower_count
.is_some_and(|c| c >= VIEWER_FOLLOWERS_THRESHOLD);
has_enough_followers && query.params.get(EnableFollowingRepliedUsersFacepile)
}
Two gates: enough followers + feature flag.
async fn hydrate(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Vec<Result<PostCandidate, String>> {
let min_users = query
.params
.get(FollowingRepliedUsersFacepileMinUsers)
.max(0) as usize;
let max_posts = query
.params
.get(FollowingRepliedUsersFacepileMaxPosts)
.max(0) as usize;
let empty = Vec::new();
let replies = query.in_network_replies.get().unwrap_or(&empty);
let reply_author_map = Self::build_reply_author_map(replies);
Pull config: min_users (e.g., 3 — only show facepile if at least 3 followed users replied), max_posts (e.g., 5 — cap how many posts in a response get a facepile to avoid clutter).
The query.in_network_replies is the OnceLock<Vec<InNetworkReply>> from Session 04. Get the inner value or default to empty. Build the reply→authors map.
let mut results = Vec::with_capacity(candidates.len());
let mut selected_count: usize = 0;
for candidate in candidates {
let is_root_tweet = candidate.in_reply_to_tweet_id.is_none();
let authors: Vec<u64> = reply_author_map
.get(&candidate.tweet_id)
.map(|ids| {
ids.iter()
.copied()
.filter(|&aid| aid != candidate.author_id)
.collect()
})
.unwrap_or_default();
let eligible = is_root_tweet && authors.len() >= min_users;
let user_ids = if eligible && selected_count < max_posts {
selected_count += 1;
authors
} else {
vec![]
};
results.push(Ok(PostCandidate {
following_replied_user_ids: user_ids,
..Default::default()
}));
}
results
}
Per-candidate:
- Only root tweets get facepiles. Replies to replies don't get them (would be confusing UX).
- Pull the authors who replied to this tweet, filter out self-replies (
aid != candidate.author_id). - Eligibility: enough followed-author replies, and we haven't hit the
max_postscap yet. - If eligible, count it (
selected_count += 1) and populate the user list. Otherwise empty.
The max_posts cap is global across the batch — we maintain a running counter. The first max_posts eligible candidates (in iteration order) get facepiles; the rest don't. This means order matters here: candidates earlier in the list (likely higher-scored, since this is post-selection) get priority.
What we've learned
Two-arm A/B test scaffolding: AdsBrandSafetyHydrator and AdsBrandSafetyVfHydrator are mirror gates — exactly one runs per request based on a decider flag. The non-redundant code is the choice of upstream service (safety label store vs VF safety labels). When the experiment concludes, one of the two gets deleted.
Transitive safety: brand-safety verdicts inherit from quoted tweets via worst_verdict. A safe post quoting an unsafe post → unsafe. Pessimistic combination.
Custom cache TTLs by tweet age: build_moka_cache_tweet_age(...) configures different TTLs for new vs. old posts. Engagement counts use 5/10 min, brand safety uses 1/60 min. The tighter window for new posts catches rapid label changes.
Bitsets as feature blobs: TweetTypeMetricsHydrator builds a HashSet<usize> of categorical features, then packs into bytes for the wire. Cumulative vs. exclusive bit groups encode different semantics (age buckets cumulative, follower buckets exclusive). The receiver decodes by bit position.
Manual cache + parallel I/O: QuoteHydrator doesn't use the CachedHydrator blanket impl because it has additional I/O after the primary cache lookup (block-by check + video duration). The pattern is: manually traverse cache, fetch misses, then tokio::join! other dependent fetches in parallel.
Two-level visibility filtering: VFCandidateHydrator calls VF with TimelineHome for in-network and TimelineHomeRecommendations for OON. Different safety levels for different content sources — follows get more lenient treatment than algorithmic recommendations.
MinHash for Jaccard estimation: mutual_follow_jaccard_hydrator uses 256-hash MinHash signatures. Linear-time matching instead of set arithmetic. Strict validation (256-hash minimum) — too few hashes makes the estimate too noisy.
Retweet topic inheritance: filtered_topics_hydrator fetches the retweeted tweet's topics when the candidate is a retweet. Retweets don't have independent classifications.
Facepile rate-limiting: FollowingRepliedUsersHydrator caps how many posts in a single response can carry a facepile (max_posts). Maintains a running counter across the batch. Earlier-in-batch candidates win priority.
Permissive vs strict failure modes:
EngagementCountsHydrator: alwaysOk, never fails (Nonecounts are fine).AdsBrandSafetyHydrator: fails the candidate on lookup error (safety is critical).QuoteHydrator::get_blocked_by:unwrap_or_default()(treat error as "no one blocks you" — permissive).BlockedByHydrator(Session 07): all-batch error on RPC failure.
Each hydrator chooses the failure mode that matches the field's downstream importance.
Bookkeeping flags:
query.is_shadow_traffic→ run extra hydrators for shadow logging only.query.params.get(EnableXxx)→ ramp via feature flag.query.decider.is_some_and(|d| d.enabled("xxx"))→ A/B testing per-user.
Next session
Session 09 — Query hydrators. The 16 hydrators that populate ScoredPostsQuery (vs. PostCandidate). About 1,235 LOC across files like scoring_sequence_query_hydrator.rs, mutual_follow_query_hydrator.rs, user_action_seq_query_hydrator.rs, the bloom filter hydrator, the cached posts hydrator, etc.