X For You algorithm, line by line · Part 5
X For You algorithm, line by line — Part 5: Home-Mixer filters
Part 5 of the deep dive into xai-org/x-algorithm. All 18 filters in home-mixer/filters/ — duplicates, age, self-tweet, retweet dedup, ineligible subscription, three flavors of seen-post filtering (bloom + impressed + served), muted keyword tokenizer matching, 6-relationship socialgraph filter, conversation dedup, and the 571-LOC topic taxonomy filter.
We move from the orchestration shell into the first batch of pipeline components: filters. Filters are the synchronous, no-I/O stage of the pipeline; they receive a Vec<PostCandidate> and partition it into kept/removed sets. From Session 01 we know they run sequentially, each filter receiving the previous filter's kept set as input.
This session covers all 18 filters (1,118 LOC). Most are 20-60 lines of simple logic — a partition on a predicate. The exceptions are topic_ids_filter (571 LOC, the giant) and muted_keyword_filter (57 LOC but uses an interesting tokenizer/matcher), which we'll spend more time on.
Files covered:
home-mixer/filters/
├── mod.rs (18)
├── drop_duplicates_filter.rs (28) simplest: dedup by tweet_id
├── core_data_hydration_filter.rs (16) drop rows with author_id=0
├── age_filter.rs (36) drop posts older than threshold
├── self_tweet_filter.rs (21) drop the viewer's own posts
├── retweet_deduplication_filter.rs (31) dedup by original-or-self tweet_id
├── ineligible_subscription_filter.rs (32) drop paywalled posts
├── previously_seen_posts_filter.rs (34) seen_ids + bloom filters
├── previously_seen_posts_backup_filter.rs (29) impressed_post_ids fallback
├── previously_served_posts_filter.rs (33) served_ids check
├── muted_keyword_filter.rs (57) tokenizer-based keyword match
├── author_socialgraph_filter.rs (61) blocks/mutes from any direction
├── new_user_topic_ids_filter.rs (31) onboarding topic filter
├── video_filter.rs (23) drop videos when excluded
├── vf_filter.rs (30) visibility filter action=Drop
├── ancillary_vf_filter.rs (19) drop ancillary candidates
├── dedup_conversation_filter.rs (48) one post per conversation
└── topic_ids_filter.rs (571) topic taxonomy + supertopic mapping
mod.rs (18 lines)
pub mod age_filter;
pub mod ancillary_vf_filter;
pub mod author_socialgraph_filter;
pub mod core_data_hydration_filter;
pub mod dedup_conversation_filter;
pub mod drop_duplicates_filter;
pub mod ineligible_subscription_filter;
pub mod muted_keyword_filter;
pub mod new_user_topic_ids_filter;
pub mod previously_seen_posts_backup_filter;
pub mod previously_seen_posts_filter;
pub mod previously_served_posts_filter;
pub mod retweet_deduplication_filter;
pub mod self_tweet_filter;
pub mod topic_ids_filter;
pub mod vf_filter;
pub mod video_filter;
Re-exports. 18 pub mod declarations. The blank line at position 7 separates "pre-scoring filters" from… well, it doesn't actually correspond to anything semantic — just whitespace separation. Both groups have a mix of pre- and post-scoring filters.
drop_duplicates_filter.rs (28 lines)
The simplest filter. Dedup by tweet_id.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::collections::HashSet;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct DropDuplicatesFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for DropDuplicatesFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let mut seen_ids = HashSet::new();
let mut kept = Vec::new();
let mut removed = Vec::new();
for candidate in candidates {
if seen_ids.insert(candidate.tweet_id) {
kept.push(candidate);
} else {
removed.push(candidate);
}
}
FilterResult { kept, removed }
}
}
A unit struct (no fields) implementing Filter<Q, C> where Q = ScoredPostsQuery and C = PostCandidate. The unused query param is _query (the underscore convention).
seen_ids.insert(...) returns true if the value was newly inserted (not already present). So the first occurrence of each tweet_id is kept; duplicates go to removed. Order-preserving for kept (we maintain input order), which matters because later filters might be order-sensitive.
This filter exists because the same post can show up from multiple sources — e.g. once from Thunder (in-network) and once from Phoenix retrieval (OON). Source-stage dedup would require coordination; pushing it to a filter is cleaner.
core_data_hydration_filter.rs (16 lines)
The smallest filter. Drop rows that failed core hydration.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct CoreDataHydrationFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for CoreDataHydrationFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (kept, removed) = candidates.into_iter().partition(|c| c.author_id != 0);
FilterResult { kept, removed }
}
}
If author_id == 0, the row didn't hydrate (the core-data hydrator failed to find the post or it's invalid). Drop it. author_id = 0 is the sentinel for "no data."
Iterator::partition returns (true_collection, false_collection). Cleaner than the explicit loop for simple predicates.
age_filter.rs (36 lines)
Drop posts older than a configured Duration.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::time::Duration;
use xai_candidate_pipeline::component_library::utils::duration_since_creation_opt;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
/// Filter that removes tweets older than a specified duration.
pub struct AgeFilter {
pub max_age: Duration,
}
impl AgeFilter {
pub fn new(max_age: Duration) -> Self {
Self { max_age }
}
fn is_within_age(&self, tweet_id: u64) -> bool {
duration_since_creation_opt(tweet_id)
.map(|age| age <= self.max_age)
.unwrap_or(false)
}
}
impl Filter<ScoredPostsQuery, PostCandidate> for AgeFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (kept, removed): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| self.is_within_age(c.tweet_id));
FilterResult { kept, removed }
}
}
The AgeFilter holds a max_age (typically configured to ~3 days for the feed). The age is computed from the tweet ID alone — duration_since_creation_opt(tweet_id) from the component library extracts the timestamp portion of the Snowflake-style ID.
Trick: if duration_since_creation_opt returns None (somehow the tweet ID is malformed), .unwrap_or(false) defaults to "not within age" → drop. Defensive: invalid IDs shouldn't slip through.
Note is_within_age is a method, but called via self.is_within_age(...) rather than as a free function. Both work; the method form lets us shadow with subclass behaviour if ever needed.
self_tweet_filter.rs (21 lines)
Drop the viewer's own posts. No one wants their own tweets in their feed.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
/// Filter that removes tweets where the author is the viewer.
pub struct SelfTweetFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for SelfTweetFilter {
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let viewer_id = query.user_id;
let (kept, removed): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| c.author_id != viewer_id);
FilterResult { kept, removed }
}
}
Pull the viewer's user_id from the query. Partition by "is author the viewer." Trivial.
Note this doesn't drop retweets-of-self — only direct authorship. If the viewer retweets something, the retweeted post passes through unless filtered by RetweetDeduplicationFilter (which doesn't know about viewer ID). For "don't show retweets the viewer themselves did" semantics, see Thunder's get_posts_from_map filter we read in Session 02.
retweet_deduplication_filter.rs (31 lines)
Dedup such that the content of each post appears at most once (regardless of whether it's an original or a retweet of it).
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::collections::HashSet;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
/// Deduplicates retweets, keeping only the first occurrence of a tweet
/// (whether as an original or as a retweet).
pub struct RetweetDeduplicationFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for RetweetDeduplicationFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let mut seen_tweet_ids: HashSet<u64> = HashSet::new();
let mut kept = Vec::new();
let mut removed = Vec::new();
for candidate in candidates {
let dedup_id = candidate.retweeted_tweet_id.unwrap_or(candidate.tweet_id);
if seen_tweet_ids.insert(dedup_id) {
kept.push(candidate);
} else {
removed.push(candidate);
}
}
FilterResult { kept, removed }
}
}
The dedup key: retweeted_tweet_id.unwrap_or(tweet_id). So retweets dedup against their original. If we have both the original and a retweet of it, whichever arrives first wins.
Order matters: if Alice's post comes before Bob-retweeting-Alice in the candidate list, Alice's is kept. If Bob's retweet came first, Bob's retweet is kept (Alice's later original is dropped). The order is set by upstream sorting (typically by score, set by scorers — but this filter runs pre-scoring, so the order is whatever the sources produced).
ineligible_subscription_filter.rs (32 lines)
Filter out subscription-only posts the viewer can't access.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::collections::HashSet;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
/// Filters out subscription-only posts from authors the viewer is not subscribed to.
pub struct IneligibleSubscriptionFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for IneligibleSubscriptionFilter {
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let subscribed_user_ids: HashSet<u64> = query
.user_features
.subscribed_user_ids
.iter()
.map(|id| *id as u64)
.collect();
let (kept, removed): (Vec<_>, Vec<_>) =
candidates
.into_iter()
.partition(|candidate| match candidate.subscription_author_id {
Some(author_id) => subscribed_user_ids.contains(&author_id),
None => true,
});
FilterResult { kept, removed }
}
}
Pull the viewer's subscribed-user list (from the hydrated user_features). Convert from Vec<i64> to HashSet<u64> for O(1) lookup.
For each candidate:
- If
subscription_author_idisSome(author_id)(post is gated behind a subscription), keep only if viewer is subscribed toauthor_id. - If
None(free post), keep unconditionally.
So this is paywall enforcement at the feed level — we never even score paywalled content the user can't read.
The i64 → u64 conversion: subscribed_user_ids are stored as i64 (from Thrift), but candidate user IDs are u64. Cast each one. (User IDs are always positive in practice, so no information loss.)
previously_seen_posts_filter.rs (34 lines)
The first non-trivial filter. Drops posts the viewer has already seen, using two parallel signals: explicit seen_ids from the client + Bloom filters of long-term seen history.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::util::candidates_util::get_related_post_ids;
use xai_candidate_pipeline::component_library::utils::BloomFilter;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
/// Filter out previously seen posts using a Bloom Filter and
/// the seen IDs sent in the request directly from the client
pub struct PreviouslySeenPostsFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for PreviouslySeenPostsFilter {
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let bloom_filters = query
.bloom_filter_entries
.iter()
.map(|e| BloomFilter::from_parts(e.size_cap, e.false_positive_rate, &e.bloom_filter))
.collect::<Vec<_>>();
let (removed, kept): (Vec<_>, Vec<_>) = candidates.into_iter().partition(|c| {
get_related_post_ids(c).iter().any(|&post_id| {
query.seen_ids.contains(&post_id)
|| bloom_filters
.iter()
.any(|filter| filter.may_contain(post_id))
})
});
FilterResult { kept, removed }
}
}
A few things to note:
get_related_post_ids(c)— a util that returns all post IDs associated with this candidate: the post itself, the retweeted post (if any), the quoted post (if any). Why all of them? Because if the user already saw the original of a retweet, we shouldn't show the retweet either. Conversely, if they saw a retweet, we shouldn't re-show the original.BloomFilter::from_parts— reconstruct the Bloom filter from the wire-transmitted parts (size_cap,false_positive_rate, raw bits). The hydrator (see Session 09) is what fetches these from the impression-history store.filter.may_contain(post_id)— Bloom filter check. May return false positives (drop a never-seen post) but never false negatives. We accept the false positives because the alternative (storing every seen post ID for a heavy user) explodes memory.The
partitioncall is(removed, kept)(not(kept, removed)) — partition gives(matched, unmatched), where matched means "predicate returned true." The predicate here is "any related ID is in seen_ids or a bloom filter" — i.e. "we've seen this." Somatched = removedandunmatched = kept. Pay attention to the order.
The double-pass through bloom_filters (any inside any) is O(K × B × R) where K = candidates, B = bloom filters per request, R = related IDs per candidate. K is up to ~5000, B is 2-3, R is 1-3 typically. So ~30000 Bloom-filter checks per request. Each check is a few hash lookups — fast enough.
previously_seen_posts_backup_filter.rs (29 lines)
A simpler fallback for when Bloom filters aren't available.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::util::candidates_util::get_related_post_ids;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct PreviouslySeenPostsBackupFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for PreviouslySeenPostsBackupFilter {
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
if query.impressed_post_ids.is_empty() {
return FilterResult {
kept: candidates,
removed: Vec::new(),
};
}
let (removed, kept): (Vec<_>, Vec<_>) = candidates.into_iter().partition(|c| {
get_related_post_ids(c)
.iter()
.any(|id| query.impressed_post_ids.contains(id))
});
FilterResult { kept, removed }
}
}
Uses impressed_post_ids (a Vec
The early bail if query.impressed_post_ids.is_empty() is critical: if the impressed-post-ids hydrator hasn't run (or returned empty), don't waste time. No-op return.
The "backup" naming is because this runs even when the primary PreviouslySeenPostsFilter ran — both filters are configured in the pipeline, and each catches different cases (Bloom filters cover long-term history; impressed_post_ids covers recent sessions).
previously_served_posts_filter.rs (33 lines)
The third "already seen" variant. Uses served_ids (posts we sent down to this user in prior responses) rather than what the client says it has displayed.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::params::EnableServedFilterAllRequests;
use crate::util::candidates_util::get_related_post_ids;
use xai_candidate_pipeline::component_library::utils::client_utils::RequestContext::{
self, ForegroundTruncate,
};
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct PreviouslyServedPostsFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for PreviouslyServedPostsFilter {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
let req_context = RequestContext::parse(&query.request_context);
let enable_all = query.params.get(EnableServedFilterAllRequests);
enable_all || (query.is_bottom_request && req_context != ForegroundTruncate)
}
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (removed, kept): (Vec<_>, Vec<_>) = candidates.into_iter().partition(|c| {
get_related_post_ids(c)
.iter()
.any(|id| query.served_ids.contains(id))
});
FilterResult { kept, removed }
}
}
First filter we've seen with a non-default enable method. The filter runs only if:
- The feature flag
EnableServedFilterAllRequestsis true (rollout switch), OR - It's a bottom request (user scrolling down, asking for older posts) AND not a
ForegroundTruncatecontext (which means "user just refreshed; let them see fresh stuff even if we've served it before").
So: served-history filtering is opt-in by request type. For a top-of-feed refresh, we don't re-filter — let recent posts re-appear. For pagination down, we do filter — don't repeat what they just saw scrolling.
The filtering logic itself is the same as the backup filter — partition by "any related ID is in served_ids."
muted_keyword_filter.rs (57 lines)
A non-trivial filter that uses a proper tokenizer + matcher, not a simple substring search.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::sync::Arc;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
use xai_post_text::{MatchTweetGroup, TokenSequence, TweetTokenizer, UserMutes};
pub struct MutedKeywordFilter {
pub tokenizer: Arc<TweetTokenizer>,
}
impl MutedKeywordFilter {
pub fn new() -> Self {
let tokenizer = TweetTokenizer::new();
Self {
tokenizer: Arc::new(tokenizer),
}
}
}
The struct holds an Arc<TweetTokenizer> — shared once, used many times. TweetTokenizer is X's tokenizer for tweet text (handles hashtags, mentions, URLs, etc. specially).
impl Filter<ScoredPostsQuery, PostCandidate> for MutedKeywordFilter {
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let muted_keywords = query.user_features.muted_keywords.clone();
if muted_keywords.is_empty() {
return FilterResult {
kept: candidates,
removed: vec![],
};
}
let tokenizer = self.tokenizer.clone();
tokio::task::block_in_place(|| {
let tokenized = muted_keywords.iter().map(|k| tokenizer.tokenize(k));
let token_sequences: Vec<TokenSequence> = tokenized.collect::<Vec<_>>();
let user_mutes = UserMutes::new(token_sequences);
let matcher = MatchTweetGroup::new(user_mutes);
let mut kept = Vec::new();
let mut removed = Vec::new();
for candidate in candidates {
let tweet_text_token_sequence = tokenizer.tokenize(&candidate.tweet_text);
if matcher.matches(&tweet_text_token_sequence) {
removed.push(candidate);
} else {
kept.push(candidate);
}
}
FilterResult { kept, removed }
})
}
}
The flow:
- Pull muted keywords from
user_features(hydrated by a query hydrator). - Early bail if no muted keywords.
tokio::task::block_in_place— declares that the rest of this closure is going to be CPU-heavy. This tells Tokio's runtime to "move this thread off the async pool and run synchronously," freeing up async resources for other tasks. Filters are sync in the trait, butblock_in_placeis the explicit way to mark "I'll be slow."- Tokenize each muted keyword once into a
TokenSequence. - Build the matcher:
UserMutes::new(token_sequences)packages the keyword token sequences;MatchTweetGroup::new(user_mutes)builds the matcher state. - For each candidate, tokenize the tweet text and check
matcher.matches(...). If matched, removed.
Why tokenization? Because muting "tesla" should match Tesla, tesla.com, #Tesla, but probably not cybertruck (which is the brand but not the keyword) and shouldn't match protest a la mode (substring "tesla" in "protesta"). Token-level matching gets the semantics right.
The Arc<TweetTokenizer> is cloned on every request. Arc::clone is cheap (refcount bump), and block_in_place needs a 'static closure, so this gives the closure an owned Arc. Actually, the tokenizer variable inside the closure isn't moved — it's borrowed via Rust's automatic borrow inference. Either way, the pattern compiles.
author_socialgraph_filter.rs (61 lines)
The catch-all "block/mute graph" filter. Six different reasons a candidate gets dropped.
use std::collections::HashSet;
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
// Remove candidates that are blocked or muted by the viewer
pub struct AuthorSocialgraphFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for AuthorSocialgraphFilter {
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let viewer_blocked_user_ids: HashSet<i64> = query
.user_features
.blocked_user_ids
.iter()
.copied()
.collect();
let viewer_muted_user_ids: HashSet<i64> =
query.user_features.muted_user_ids.iter().copied().collect();
Build two HashSets for O(1) lookups: who the viewer has blocked, who they've muted.
let mut kept: Vec<PostCandidate> = Vec::new();
let mut removed: Vec<PostCandidate> = Vec::new();
for candidate in candidates {
let author_id = candidate.author_id as i64;
let muted = viewer_muted_user_ids.contains(&author_id);
let blocked = viewer_blocked_user_ids.contains(&author_id);
let author_blocks_viewer = candidate.author_blocks_viewer.unwrap_or(false);
let quoted_author_blocks_viewer =
candidate.quoted_author_blocks_viewer.unwrap_or(false);
let viewer_blocks_quoted_author = candidate
.quoted_user_id
.map(|uid| viewer_blocked_user_ids.contains(&(uid as i64)))
.unwrap_or(false);
let viewer_blocks_retweeted_user = candidate
.retweeted_user_id
.map(|uid| viewer_blocked_user_ids.contains(&(uid as i64)))
.unwrap_or(false);
Six block/mute predicates per candidate:
- viewer mutes author — direct mute.
- viewer blocks author — direct block.
- author blocks viewer — reverse block (hydrated by
blocked_by_hydrator). - quoted author blocks viewer — if it's a quote tweet of someone who blocked the viewer.
- viewer blocks quoted author — if the viewer blocked the person being quoted.
- viewer blocks retweeted user — if the viewer blocked the original author of a retweet.
if muted
|| blocked
|| author_blocks_viewer
|| quoted_author_blocks_viewer
|| viewer_blocks_quoted_author
|| viewer_blocks_retweeted_user
{
removed.push(candidate);
} else {
kept.push(candidate);
}
}
FilterResult { kept, removed }
}
}
OR them all together. If any is true, drop.
This is the only filter that touches multiple block/mute relationships at once. Doing it as one filter (rather than six small filters) is faster: one pass over candidates instead of six.
Note muting is asymmetric: the viewer mutes someone → don't show their posts. But "user X mutes viewer" doesn't affect the viewer's feed. Hence only the viewer→muted direction is checked. Blocking, on the other hand, is symmetric in feed effect: if either party blocked the other, hide.
new_user_topic_ids_filter.rs (31 lines)
A specialized filter for the new-user onboarding experience.
use crate::filters::topic_ids_filter::TopicIdExpansion;
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::params::EnableNewUserTopicFiltering;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct NewUserTopicIdsFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for NewUserTopicIdsFilter {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.params.get(EnableNewUserTopicFiltering)
&& query.has_new_user_topic_ids()
&& !query.is_topic_request()
}
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let expanded =
TopicIdExpansion::expand(&query.new_user_topic_ids.iter().copied().collect());
let (kept, removed) = candidates.into_iter().partition(|c| {
c.in_network == Some(true)
|| matches!(&c.filtered_topic_ids, Some(t) if !t.is_empty() && t.iter().any(|tid| expanded.contains(tid)))
});
FilterResult { kept, removed }
}
}
Enabled when:
- Feature flag is on (
EnableNewUserTopicFiltering) - User has selected new-user topics (the onboarding "what are you interested in?" picker)
- This is not an explicit topic request (different filtering for topic feeds)
The filter:
- Always keep in-network posts (
in_network == Some(true)). - For OON posts: keep only if
filtered_topic_idsoverlaps with the expanded new-user topics.
TopicIdExpansion::expand (defined later in topic_ids_filter.rs) maps each topic to its sub-topics. If a new user said "interested in Sports," that expands to specific sports (NBA, NFL, etc.). So this is strict relevance filtering for OON: new users see only their declared topics from OON sources, but all in-network from their follow graph.
This shapes the first-week experience: new users get a focused feed from their declared interests rather than a generic OON firehose.
matches!(...if guard) — a matches! macro with a guard. Equivalent to if let Some(t) = ... if !t.is_empty() && .... Concise.
video_filter.rs (23 lines)
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct VideoFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for VideoFilter {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.exclude_videos
}
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (kept, removed): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| c.min_video_duration_ms.is_none());
FilterResult { kept, removed }
}
}
Enabled only when query.exclude_videos. Drops anything with min_video_duration_ms = Some(...) (i.e. has a video).
Used by clients that don't render video (e.g. text-only embedded scenarios).
vf_filter.rs (30 lines) — visibility filtering
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
use xai_visibility_filtering::models::{Action, FilteredReason};
pub struct VFFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for VFFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (removed, kept): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| should_drop(&c.visibility_reason));
FilterResult { kept, removed }
}
}
fn should_drop(reason: &Option<FilteredReason>) -> bool {
match reason {
Some(FilteredReason::SafetyResult(safety_result)) => {
matches!(safety_result.action, Action::Drop(_))
}
Some(_) => true,
None => false,
}
}
Visibility Filtering (VF). The visibility_reason field on the candidate is populated by the VF hydrator. If it's None, the post is visible. If it's Some(reason), we check what kind:
Some(SafetyResult(r)): a safety service decision. Look at the action —Drop(_)means hard-drop (gore, NSFW, etc.); other actions (warn, blur, etc.) leave the post in for downstream UI to handle.Some(_)(any other reason): drop. Other reasons include "deleted," "spam," "user banned." All hard-drops.None: keep.
This filter typically runs post-selection — VF is expensive (involves remote calls), so we only run it on the top-K candidates.
ancillary_vf_filter.rs (19 lines)
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct AncillaryVFFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for AncillaryVFFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (removed, kept): (Vec<_>, Vec<_>) = candidates
.into_iter()
.partition(|c| c.drop_ancillary_posts.unwrap_or(false));
FilterResult { kept, removed }
}
}
drop_ancillary_posts is a single-bit flag set by some upstream stage indicating "this post is an ancillary signal (e.g. quoted post for context) but isn't itself a primary candidate — drop the standalone version." Trivial filter; the logic is all in the upstream hydrator that sets the flag.
dedup_conversation_filter.rs (48 lines) — keep best per thread
The first filter that uses the score field. Among multiple posts in the same conversation thread, keep only the one with the highest score.
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use std::collections::HashMap;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
pub struct DedupConversationFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for DedupConversationFilter {
fn filter(
&self,
_query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let mut kept: Vec<PostCandidate> = Vec::new();
let mut removed: Vec<PostCandidate> = Vec::new();
let mut best_per_convo: HashMap<u64, (usize, f64)> = HashMap::new();
for candidate in candidates {
let conversation_id = get_conversation_id(&candidate);
let score = candidate.score.unwrap_or(0.0);
if let Some((kept_idx, best_score)) = best_per_convo.get_mut(&conversation_id) {
if score > *best_score {
let previous = std::mem::replace(&mut kept[*kept_idx], candidate);
removed.push(previous);
*best_score = score;
} else {
removed.push(candidate);
}
} else {
let idx = kept.len();
best_per_convo.insert(conversation_id, (idx, score));
kept.push(candidate);
}
}
FilterResult { kept, removed }
}
}
fn get_conversation_id(candidate: &PostCandidate) -> u64 {
candidate
.ancestors
.iter()
.copied()
.min()
.unwrap_or(candidate.tweet_id)
}
The data structure: best_per_convo: HashMap<conversation_id, (kept_index, best_score)> — for each conversation we've seen, track which position in kept holds it and what its score is.
For each new candidate:
- Compute its
conversation_idviaget_conversation_id. This usesancestors.iter().min()— the smallest ancestor ID is the root of the conversation (since post IDs are time-ordered Snowflakes, the smallest = oldest = root). Fall back to the post's own ID if no ancestors. - If we've seen this conversation:
- If new candidate scores higher, swap it into
kept[idx]viastd::mem::replace. Push the displaced one toremoved. Update best_score. - Else, drop the new candidate.
- If new candidate scores higher, swap it into
- If new conversation, add it.
The std::mem::replace is the idiom for "swap a value, get the old one." Cheap — just pointer manipulation; no allocation.
This filter runs post-selection (after scoring). It needs scores to make the "best in thread" decision.
Notice: post IDs are mostly preserved by index — kept stays the same length, just with elements swapped. Original positions are preserved for kept items, replacements happen in place. Order of removed reflects rejection order, not original order — but removed is mostly used for accounting, not display.
topic_ids_filter.rs (571 lines) — the giant
This is the longest filter in the codebase. It has three pieces:
TopicIdsFilter— the actual filter (~100 LOC).TopicIdExpansion— the topic taxonomy (the bulk, ~410 LOC of category data).TopicFilteringOverrideMap— a string-to-experiment parser (~55 LOC).
We'll read each in turn.
The filter itself
use crate::models::candidate::PostCandidate;
use crate::models::candidate_features::TopicFilteringExperiment;
use crate::models::query::ScoredPostsQuery;
use crate::params::topics::*;
use std::collections::HashMap;
use std::collections::HashSet;
use tracing::warn;
use xai_candidate_pipeline::filter::{Filter, FilterResult};
use xai_stats_receiver::global_stats_receiver;
const EXCLUDED_TOPIC_METRIC: &str = "TopicIdsFilter.excluded_topic_id";
pub struct TopicIdsFilter;
impl Filter<ScoredPostsQuery, PostCandidate> for TopicIdsFilter {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
query.is_topic_request() || query.has_excluded_topics()
}
Enabled when the query includes specific topics or excludes specific topics. Default For You requests skip this filter entirely.
fn filter(
&self,
query: &ScoredPostsQuery,
candidates: Vec<PostCandidate>,
) -> FilterResult<PostCandidate> {
let (mut kept, mut removed) = if query.is_topic_request() {
let query_topic_ids: HashSet<i64> = query.topic_ids.iter().copied().collect();
let expanded = TopicIdExpansion::expand(&query_topic_ids);
if query.is_bulk_topic_request() {
let all_topic_ids = TopicIdExpansion::all_production_topic_ids();
let excluded: HashSet<i64> = all_topic_ids.difference(&expanded).copied().collect();
let (kept, removed): (Vec<_>, Vec<_>) =
candidates
.into_iter()
.partition(|c| match &c.filtered_topic_ids {
Some(candidate_topics) if !candidate_topics.is_empty() => {
!candidate_topics.iter().all(|tid| excluded.contains(tid))
}
_ => true,
});
(kept, removed)
} else {
Two paths for topic requests:
Bulk topic request (≥7 topics — is_bulk_topic_request): Treat as "show me posts NOT in any of these other topics."
- Compute
excluded= all_known_topics ∖ expanded_query_topics. - Keep a candidate if it has no
filtered_topic_ids, or if its topics are not all in the excluded set (i.e., at least one of its topics is in the query topics). - This is a "subtract-the-rest" pattern — easier when the query selects most topics.
let supertopic_expanded: HashMap<i64, HashSet<i64>> = query
.topic_ids
.iter()
.map(|&tid| (tid, TopicIdExpansion::expand_supertopic(tid)))
.collect();
let (kept, removed): (Vec<_>, Vec<_>) = candidates.into_iter().partition(|c| {
let exp_topics = c.filtered_topic_ids.as_deref().unwrap_or(&[]);
let unf_topics = c.unfiltered_topic_ids.as_deref().unwrap_or(&[]);
query.topic_ids.iter().any(|&tid| {
let tid_expanded = match TopicIdExpansion::category_ids(tid) {
Some(ids) => ids,
None => std::slice::from_ref(&tid),
};
let path1 = exp_topics.iter().any(|et| tid_expanded.contains(et));
if path1 {
return true;
}
if let Some(st_ids) = supertopic_expanded.get(&tid) {
let unf_has_topic =
unf_topics.iter().any(|ut| tid_expanded.contains(ut));
let exp_has_supertopic =
exp_topics.iter().any(|et| st_ids.contains(et));
unf_has_topic && exp_has_supertopic
} else {
false
}
})
});
(kept, removed)
}
} else {
(candidates, vec![])
};
Non-bulk topic request (1-6 topics): A more careful per-topic match. For each candidate:
For each requested topic ID:
- Path 1 — direct match: Get
tid_expanded(the topic itself or its category sub-topics). Check if the candidate'sfiltered_topic_idsoverlaps. If yes → keep. - Path 2 — supertopic match: If path 1 fails, check the supertopic. Get the supertopic's expanded set. Keep if:
- The candidate's unfiltered topics include the request topic itself (suggesting relevance), AND
- The candidate's filtered topics include any topic in the supertopic family.
So path 2 says: "If the candidate has the requested topic in its unfiltered topics (but was filtered out by some classifier), but its filtered topics include a sibling under the same supertopic, accept it." This is a fallback for partially-classified posts — give them a chance based on supertopic affinity.
std::slice::from_ref(&tid) — creates a &[i64] of length 1 from a &i64. Standard library trick when you have an Option<&[T]> and want a uniform interface for "the resolved topics."
If no topic request, no filtering on this path: (candidates, vec![]).
if query.has_excluded_topics() {
if let Some(receiver) = global_stats_receiver() {
for &tid in &query.excluded_topic_ids {
let tid_str = tid.to_string();
receiver.incr(EXCLUDED_TOPIC_METRIC, &[("topic_id", &tid_str)], 1);
}
}
let excluded_ids: HashSet<i64> = query.excluded_topic_ids.iter().copied().collect();
let mut excluded_expanded = TopicIdExpansion::expand(&excluded_ids);
excluded_expanded.extend(&excluded_ids);
let (new_kept, new_removed): (Vec<_>, Vec<_>) =
kept.into_iter().partition(|c| match &c.filtered_topic_ids {
Some(candidate_topics) if !candidate_topics.is_empty() => !candidate_topics
.iter()
.any(|tid| excluded_expanded.contains(tid)),
_ => false,
});
kept = new_kept;
removed.extend(new_removed);
}
FilterResult { kept, removed }
}
}
After the topic-inclusion logic, apply topic-exclusion (the snoozed-topics case):
- Emit per-topic-id metrics.
- Build
excluded_expanded= expand(excluded) ∪ excluded. - Drop any candidate whose topics include any excluded topic.
- Drop candidates with no topics at all —
_ => false(note thefalsein the catch-all). This is harsh: if you've snoozed any topics, posts without classified topics are dropped too. Defensive: don't accidentally show snoozed content via uncategorized posts.
The excluded_expanded.extend(&excluded_ids) is interesting — expand(...) returns the sub-topics. We then add the original IDs back in. This catches both supertopic IDs (like "Sports") and their sub-topic IDs (like "Soccer") when you snooze "Sports."
TopicIdExpansion::category_ids
pub struct TopicIdExpansion;
impl TopicIdExpansion {
pub fn category_ids(topic_id: i64) -> Option<&'static [i64]> {
match topic_id {
SCIENCE_TECHNOLOGY => Some(&[
XAI_SCIENCE,
XAI_TECHNOLOGY,
XAI_AI,
XAI_SOFTWARE_DEVELOPMENT,
XAI_ROBOTICS,
XAI_SPACE,
XAI_BIOTECH,
XAI_ELECTRONICS,
]),
ENTERTAINMENT => Some(&[
XAI_MOVIES_TV,
XAI_STREAMING,
XAI_MUSIC,
XAI_DANCE,
XAI_CELEBRITY,
XAI_GAMING,
XAI_ANIME,
]),
…
The category data. Each top-level category (like SCIENCE_TECHNOLOGY) expands to a static array of sub-topics. The full match has ~80 arms covering the entire topic taxonomy.
The match return type is Option<&'static [i64]> — referencing static data. Cheap return (no allocation).
The taxonomy is a flat-ish hierarchy:
SCIENCE_TECHNOLOGY→ 8 subtopicsENTERTAINMENT→ 7 subtopicsBUSINESS_FINANCE→ 6 subtopicsSPORTS→ 30+ subtopics (every individual sport)- Plus leaf nodes (where the category just maps to itself:
XAI_POLITICS => Some(&[XAI_POLITICS])).
The leaf nodes (single-element arrays mapping to themselves) seem redundant — _ => None would do the same thing for callers since they can fall back to the original ID. But returning Some(&[X]) makes the call site uniform (caller always sees a slice).
I'll quote a representative sample of the categories rather than reproduce all 80 arms:
XAI_RACING => Some(&[XAI_RACING, XAI_FORMULA1, XAI_NASCAR]),
…
XAI_ICE_HOCKEY => Some(&[XAI_ICE_HOCKEY, XAI_NHL]),
…
XAI_SKIING => Some(&[XAI_SKIING, XAI_SNOWBOARDING]),
XAI_HEALTH_FITNESS => Some(&[
XAI_HEALTH_FITNESS,
XAI_NUTRITION,
XAI_WORKOUTS,
]),
XAI_TRAVEL => Some(&[XAI_TRAVEL, XAI_AVIATION]),
XAI_FOOD => Some(&[
XAI_FOOD,
XAI_COOKING,
XAI_BAKING,
XAI_RESTAURANTS,
XAI_DRINKS,
]),
…
XAI_PETS => Some(&[XAI_PETS, XAI_CATS, XAI_DOGS]),
…
_ => None,
}
}
So XAI_RACING expands to itself plus F1 and NASCAR. XAI_PETS expands to itself plus cats and dogs. The pattern: the supertopic includes itself + all its children.
The _ => None catch-all is for any topic ID not in the table — these don't have a defined expansion (so callers treat them as leaf nodes).
expand and all_production_topic_ids
pub fn expand(topic_ids: &HashSet<i64>) -> HashSet<i64> {
let mut expanded = HashSet::new();
for &topic_id in topic_ids {
match Self::category_ids(topic_id) {
Some(ids) => {
for &id in ids {
expanded.insert(id);
}
}
None => {
expanded.insert(topic_id);
}
}
}
expanded
}
Walk each input ID, expand to the category sub-topics, collect into a HashSet (auto-deduplicates if the same sub-topic appears under multiple categories).
pub fn all_production_topic_ids() -> HashSet<i64> {
let all_client_topics: HashSet<i64> = [
SCIENCE_TECHNOLOGY,
ENTERTAINMENT,
BUSINESS_FINANCE,
SPORTS,
XAI_POLITICS,
…
XAI_STOCKS_ECONOMY,
]
.into();
Self::expand(&all_client_topics)
}
The "everything we serve" set: takes a hardcoded list of all client-visible topic IDs and expands them. ~80 IDs, expanded.
Used by the bulk-topic-request path: "all production topics ∖ query topics = topics to exclude."
resolve_first and supertopic
pub fn resolve_first(topic_id: i64) -> i64 {
topic_id
}
A stub. Returns the input. Probably intended for canonicalization (some topic IDs might be aliases for others) but currently a no-op. The function exists for future canonical resolution without changing call sites.
pub fn supertopic(topic_id: i64) -> i64 {
match topic_id {
XAI_POP
| XAI_K_POP
| XAI_COUNTRY_MUSIC
| XAI_ELECTRONIC
| XAI_J_POP
| XAI_ROCK
| XAI_HIP_HOP
| XAI_JAZZ
| XAI_CONCERTS => XAI_MUSIC,
XAI_DESIGN | XAI_DIGITAL_ART | XAI_PHOTOGRAPHY => XAI_ART,
…
XAI_CRYPTOCURRENCY
| XAI_STOCKS
| XAI_ENTREPRENEURSHIP
| XAI_REAL_ESTATE
| XAI_PERSONAL_FINANCE => XAI_BUSINESS_FINANCE_REAL,
XAI_AI
| XAI_SOFTWARE_DEVELOPMENT
| XAI_ROBOTICS
| XAI_ELECTRONICS => XAI_TECHNOLOGY,
XAI_BIOTECH | XAI_SPACE => XAI_SCIENCE,
…
_ => topic_id,
}
}
The inverse of category_ids. Given a leaf topic (like K-Pop), return its parent (Music). For top-level topics that have no parent, return themselves (_ => topic_id).
This is used by expand_supertopic:
pub fn expand_supertopic(topic_id: i64) -> HashSet<i64> {
let st = Self::supertopic(topic_id);
match Self::category_ids(st) {
Some(ids) => ids.iter().copied().collect(),
None => [st].into(),
}
}
}
Given a topic, find its supertopic, then expand to the full supertopic family. So expand_supertopic(K_POP) → [POP, K_POP, COUNTRY_MUSIC, ELECTRONIC, J_POP, ROCK, HIP_HOP, JAZZ, CONCERTS, MUSIC]. Returns the whole family, including sibling sub-topics.
Used in the path-2 logic of the filter: "candidate has K-Pop in unfiltered; classifier-derived topics include Rock; both are in the Music family → relevance via supertopic."
TopicFilteringOverrideMap
#[derive(Debug, Clone, Default)]
pub struct TopicFilteringOverrideMap {
overrides: HashMap<i64, TopicFilteringExperiment>,
}
impl TopicFilteringOverrideMap {
pub fn parse(raw: &str) -> Self {
let mut overrides = HashMap::new();
if raw.is_empty() {
return Self { overrides };
}
for entry in raw.split(',') {
let entry = entry.trim();
if entry.is_empty() {
continue;
}
if let Some((topic_str, experiment_str)) = entry.split_once('=') {
match topic_str.trim().parse::<i64>() {
Ok(topic_id) => {
let experiment = TopicFilteringExperiment::parse(experiment_str.trim());
overrides.insert(topic_id, experiment);
}
Err(_) => {
warn!(
"TopicFilteringOverrides: invalid topic_id '{}' in entry '{}'",
topic_str, entry
);
}
}
} else {
warn!(
"TopicFilteringOverrides: malformed entry '{}', expected 'topic_id=ExperimentId'",
entry
);
}
}
Self { overrides }
}
pub fn resolve(
&self,
query_topic_ids: &[i64],
default: TopicFilteringExperiment,
) -> TopicFilteringExperiment {
if self.overrides.is_empty() {
return default;
}
for &tid in query_topic_ids {
if let Some(&exp) = self.overrides.get(&tid) {
return exp;
}
}
default
}
}
A small DSL parser. Reads a string like "-1234567890=CuratedV0,9876543210=PostBased75Pct" (feature-switch value) and parses it into a HashMap<topic_id, experiment>.
split(',') → entries. Each entry.split_once('=') → (topic_str, experiment_str). Parse the topic ID and look up the experiment via TopicFilteringExperiment::parse (we saw this enum in Session 04's candidate_features.rs).
Both error paths (invalid topic ID, malformed entry) just log a warning and skip — never fail to parse the whole string. Forward-compatible if someone adds a new entry format.
The resolve(query_topic_ids, default): for each topic in the query, if it has an override in the map, return that experiment. Otherwise return the default. So per-topic experiment arm assignment — different experiments can target different topics.
End of topic_ids_filter.rs and end of the filter module.
What we've learned
The 80/20 of filtering: most filters are 20-40 lines of pure logic. The trait abstraction gives them automatic instrumentation, stats, and span integration. We get a lot of telemetry per LOC.
Two-stage filtering:
- Pre-scoring: drop candidates that can never be shown (blocked, muted, self-tweets, too old, already seen, duplicates). Avoids wasted scoring compute.
- Post-scoring / post-selection: drop candidates that fail expensive checks (VF, conversation dedup). Uses the score field to make decisions (which post in a thread to keep).
Filter dependencies (implicit):
DedupConversationFilterneedsscorefield populated → runs after scoring.VFFilterneedsvisibility_reasonfield → runs after VF hydrator.PreviouslySeenPostsFilterneedsbloom_filter_entrieson query → runs after the impression-bloom hydrator (a query hydrator).MutedKeywordFilterneedsmuted_keywordsonuser_features→ runs afteruser_features_query_hydrator.IneligibleSubscriptionFilterneedssubscription_author_idon candidate → runs after subscription hydrator.
The pipeline ordering matters. Stages that produce required fields must run before stages that consume them.
partition vs explicit loops: simple filters use Iterator::partition for one-liner clarity. More complex ones (with index tracking like DedupConversationFilter) use explicit loops for control.
Watch the partition output order: partition returns (true_collection, false_collection). The predicate convention varies:
- Predicate "should keep?" →
(kept, removed). - Predicate "should drop?" →
(removed, kept).
The codebase mixes both; pay attention to the destructure names, not the positional ordering.
Topic taxonomy as code: topic_ids_filter.rs embeds 80+ topic categories in match statements. It's tied to the constants in params::topics::* (not visible in the open release). Every taxonomy change requires a code release — there's no runtime configurability. Trade-off: simpler, faster, but tighter coupling.
Multi-layer "seen" filtering: three filters cover different aspects:
PreviouslySeenPostsFilter— Bloom filters (long history, fuzzy)PreviouslySeenPostsBackupFilter— explicit IDs (recent session)PreviouslyServedPostsFilter— what we sent down in prior responses (conditional on request type)
Defensive overlap: each catches different cases, with the union providing strong "don't repeat content" guarantees.
Block/mute is asymmetric: muting is one-way (viewer → muted). Blocking blocks both directions. AuthorSocialgraphFilter correctly handles 6 distinct relationships.
Next session
Session 06 — Home-Mixer's candidate_pipeline/ sub-module (~1,199 LOC). This is the concrete pipeline configuration — finally we see what gets plugged into the CandidatePipeline trait. The two key files:
phoenix_candidate_pipeline.rs(772 LOC) — declares the full pipeline: which hydrators, filters, sources, scorers, side-effects.for_you_candidate_pipeline.rs(278 LOC) — the outer pipeline that wrapsphoenix_candidate_pipelineplus URT decoration.
After Session 06 you'll have a full picture of what runs in a real feed request — every component, in order.