X For You algorithm, line by line · Part 1

X For You algorithm, line by line — Part 1: Architecture & the candidate-pipeline framework

Deep dive into xai-org/x-algorithm. Part 1 walks the system architecture and every line of the candidate-pipeline Rust crate (1,031 LOC, 10 files) — Source, Hydrator, Filter, Scorer, Selector, SideEffect, and the master orchestrator.

May 14, 2026·47 min read

This is the first article in a 22-part line-by-line tour of the xai-org/x-algorithm release — the source for the recommendation system that powers the "For You" feed on X. The repo is approximately 25,000 lines of Rust and Python; this session covers the architectural framing plus the foundational Rust crate, candidate-pipeline (1,031 LOC, 10 files).

We start here because every Rust service in the repo (home-mixer, parts of thunder) plugs into the abstractions defined in this crate. Understanding it once means you understand the shape of every other Rust file we'll read.


Part 1 — The Big Picture

What the feed has to do

When you open X and look at "For You," the system has to decide, in under a second, which ~50 posts to show you out of a corpus of millions per minute. To do that, it runs this rough pipeline:

  1. Look up who you are. Pull your engagement history, your follow graph, your impression history, your IP, your demographics, your muted keywords, etc.
  2. Find candidate posts. Two big buckets:
    • In-Network (Thunder): posts authored recently by accounts you follow. This is just an in-memory index keyed by author.
    • Out-of-Network (Phoenix Retrieval): posts found via a two-tower ML model that embeds you and embeds posts, then does nearest-neighbour search.
  3. Enrich each candidate. Hydration calls go out to other services for post text, author info, video metadata, brand-safety signals, etc.
  4. Filter. Drop duplicates, your own posts, posts from blocked accounts, posts you've already seen, paywalled content you can't read, etc.
  5. Score. Run the Phoenix transformer to predict probabilities for ~15 actions (P(like), P(reply), P(repost), P(report), …). Combine those into a single weighted score where negative actions (block, mute, report) push the score down.
  6. Select. Sort by score, take top-K.
  7. Post-filter. Final pass for safety / dedup of conversation threads.
  8. Mix in ads.
  9. Fire off side-effects asynchronously: log to Kafka for training data, update served-history caches, etc.

The five modules

The repo splits this into five top-level directories:

x-algorithm/
├── candidate-pipeline/   ← Rust trait framework (this article)
├── thunder/              ← Rust in-memory store of recent posts + Kafka ingest
├── home-mixer/           ← Rust service that orchestrates the entire For-You pipeline
├── phoenix/              ← Python ML: two-tower retrieval + transformer ranking
└── grox/                 ← Python content-understanding pipeline (Grok-driven)

The two Rust services (thunder, home-mixer) talk over gRPC. Phoenix is the model code (PyTorch/JAX-style). Grox is an LLM-driven content-classification service that produces labels (spam, safety, banger, …) that the rest of the system consumes.

Everything orchestration-related on the Rust side is structured as a CandidatePipeline — a generic, type-parameterised state machine with named stages. That's what we're going to read in detail now.


Part 2 — The candidate-pipeline crate, file by file

The crate has 10 files. We'll go through them in order from "trivial helper" to "main orchestrator":

candidate-pipeline/
├── lib.rs              (9 lines)    module declarations
├── util.rs             (3 lines)    one helper fn
├── source.rs           (39)         Source trait
├── query_hydrator.rs   (41)         QueryHydrator trait
├── filter.rs           (70)         Filter trait + FilterResult struct
├── scorer.rs           (65)         Scorer trait
├── selector.rs         (85)         Selector trait + SelectResult struct
├── side_effect.rs      (37)         SideEffect trait + SideEffectInput struct
├── hydrator.rs         (189)        Hydrator trait + CachedHydrator + blanket impl
└── candidate_pipeline.rs (493)      PipelineQuery/PipelineCandidate traits, CandidatePipeline trait,
                                     PipelineResult, PipelineStage, execute() orchestrator

The mental model: every stage of the feed pipeline is one of the trait types (Source, Hydrator, Filter, Scorer, Selector, SideEffect, QueryHydrator). The CandidatePipeline trait wires them together — concrete pipelines (in home-mixer/) just declare which trait objects belong in which slot, and the default execute() method runs the whole show.


lib.rs (9 lines)

pub mod candidate_pipeline;
pub mod filter;
pub mod hydrator;
pub mod query_hydrator;
pub mod scorer;
pub mod selector;
pub mod side_effect;
pub mod source;
pub mod util;

The crate root. Just pub mod declarations re-exporting every sibling file. There's no use glue, no prelude — callers always reach in via the full path (candidate_pipeline::filter::Filter, etc.). That's intentional: it forces concrete pipelines to be explicit about which traits they're consuming.

Nothing more to say here. Moving on.


util.rs (3 lines)

pub fn short_type_name(full: &'static str) -> &'static str {
    full.rsplit("::").next().unwrap_or(full)
}

One function. Given a fully qualified Rust type name like home_mixer::filters::age_filter::AgeFilter, return just the last segment: AgeFilter.

How it works:

  • full.rsplit("::") returns an iterator that yields path segments from right to left, splitting on "::".
  • .next() pulls the first element of that iterator — i.e. the last segment of the path.
  • .unwrap_or(full) says "if the path was empty / had no separators, return the original string."

This is used everywhere as the display name of a trait object. The traits define a default fn name(&self) like:

fn name(&self) -> &'static str {
    util::short_type_name(type_name_of_val(self))
}

std::any::type_name_of_val(&self) gives the full path, and short_type_name trims it. So AgeFilter shows up as "AgeFilter" in tracing spans and metrics rather than "home_mixer::filters::age_filter::AgeFilter". Subtle but important: dashboards group by metric name, and short_type_name is what keeps those names stable when files move around.


source.rs (39 lines) — the Source trait

A Source is "something that produces a list of candidate posts." Thunder is a source (gives in-network posts). Phoenix Retrieval is a source (gives ML-retrieved out-of-network posts). Ads are a source. Etc. They all run in parallel during the fetch stage.

use std::any::{Any, type_name_of_val};
use tonic::async_trait;

use crate::candidate_pipeline::{PipelineCandidate, PipelineQuery};
use crate::util;
use tracing::{error, info};

Imports:

  • Any and type_name_of_val from std::anyAny lets us require trait objects be downcastable; type_name_of_val gets the full type name string.
  • tonic::async_trait — we need #[async_trait] because Rust traits can't have native async fns yet without erasing the future. tonic (the gRPC crate) re-exports async-trait so we use its version for consistency with the gRPC code.
  • The two pipeline marker traits (PipelineCandidate, PipelineQuery) — these constrain Q and C to be the kinds of types the pipeline can carry around.
  • util::short_type_name (we just read it).
  • tracing::{error, info} for structured logging.
#[async_trait]
pub trait Source<Q, C>: Any + Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{

The trait declaration. Two type parameters:

  • Q is the query type — i.e. the bundle of "everything we know about this request" (user ID, follow list, engagement history, params, etc.).
  • C is the candidate type — a post + its metadata.

Bounds:

  • Any so trait objects can be downcast (useful for tests / introspection).
  • Send + Sync because everything runs on a multi-threaded Tokio runtime and pipelines hold these as Box<dyn Source<…>>.

The where clauses lift Q and C's required bounds out into the trait body, defining the kinds of Q and C we can possibly use.

    /// Decide if this source should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

enable is a per-request opt-out. Every source can be conditionally turned off — e.g. don't run the "who-to-follow" source for users who don't have empty timelines. Default is true (always on). The orchestrator calls this before scheduling the source's future.

The underscore on _query is just because the default impl ignores it; concrete sources frequently use it to check feature switches on the query.

    #[xai_stats_macro::receive_stats(size=Bucket500To1000)]
    #[tracing::instrument(skip_all, name = "source", fields(name = self.name()))]
    async fn run(&self, query: &Q) -> Result<Vec<C>, String> {
        match self.source(query).await {
            Ok(candidates) => {
                info!("Fetched {} candidates", candidates.len());
                Ok(candidates)
            }
            Err(err) => {
                error!("Failed: {}", err);
                Err(err)
            }
        }
    }

This is the public entry point. The orchestrator calls source.run(query), never source.source(query).

Why the indirection? Look at the two attributes on top:

  1. #[xai_stats_macro::receive_stats(size=Bucket500To1000)] — a proc macro from the (private) xai_stats_macro crate that wraps the function body to emit metrics. size=Bucket500To1000 tells the histogram buckets to use for the return size (the number of candidates fetched). So this auto-emits something like "AgeFilter.run.size" with histogram bucket 500–1000.
  2. #[tracing::instrument(skip_all, name = "source", fields(name = self.name()))] — creates a tracing span named "source" with a name=… field set to the concrete source's name. skip_all means "don't include any function args in the span." That keeps the span lightweight (queries can be big).

Inside the body:

  • self.source(query).await — call the user-supplied implementation. Note it's a &self/&Q call; sources are stateless objects.
  • On Ok: info! logs the count, then we return.
  • On Err: error! logs, then we propagate. The error is a String, which is unconventional — usually you'd use a proper error type — but String lets every source bubble up whatever message it wants without forcing a shared enum.
    async fn source(&self, query: &Q) -> Result<Vec<C>, String>;

The implementation hook. Concrete sources implement this. Note it's required (no default body).

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

Default name method — returns the short type name (via util::short_type_name(type_name_of_val(self))). Concrete sources can override but rarely need to. This name is what shows up in tracing spans and metric labels.

So the deal with Source: implement source(), get instrumentation, gating, and logging for free.


query_hydrator.rs (41 lines) — the QueryHydrator trait

Mirror of Source but for queries instead of candidates. A query hydrator takes the request and returns a hydrated copy — i.e. it fills in fields by calling some other service. For example, FollowedUserIdsQueryHydrator calls the social graph service to fetch your follow list and stuffs it into the query.

use std::any::{Any, type_name_of_val};
use tonic::async_trait;

use crate::candidate_pipeline::PipelineQuery;
use crate::util;
use tracing::error;

Same imports as source.rs, minus PipelineCandidate (we only deal with queries here) and info (we don't log a count, just errors).

#[async_trait]
pub trait QueryHydrator<Q>: Any + Send + Sync
where
    Q: PipelineQuery,
{
    /// Decide if this query hydrator should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

    #[xai_stats_macro::receive_stats]
    #[tracing::instrument(skip_all, name = "query_hydrator", fields(name = self.name()))]
    async fn run(&self, query: &Q) -> Result<Q, String> {
        match self.hydrate(query).await {
            Ok(hydrated) => Ok(hydrated),
            Err(err) => {
                error!("Failed: {}", err);
                Err(err)
            }
        }
    }

Single type parameter Q — query hydrators don't operate on candidates. Same structure as Source::run: instrumentation, gating-via-enable, error-logged-and-propagated.

    /// Hydrate the query by performing async operations.
    /// Returns a new query with this hydrator's fields populated.
    async fn hydrate(&self, query: &Q) -> Result<Q, String>;

    /// Update the query with the hydrated fields.
    /// Only the fields this hydrator is responsible for should be copied.
    fn update(&self, query: &mut Q, hydrated: Q);

Here's the interesting design choice. The hydrator does two things:

  1. hydrate(&self, query) -> Q — produces a new query with this hydrator's fields filled in (and everything else default / empty).
  2. update(&self, query: &mut Q, hydrated: Q)copies just this hydrator's fields from the new query into the existing one.

Why? Because all the query hydrators run in parallel. If each one mutated the shared query, you'd have data races (or you'd need a lock). Instead, each spawns a copy of the query, computes the fields it owns, returns the result, and then a serial pass calls update to merge each result back into the master query. Each hydrator's update is responsible for only copying its own fields, so different hydrators don't stomp on each other.

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

Same name default. End of trait.


filter.rs (70 lines) — the Filter trait + FilterResult

A filter takes a list of candidates and partitions them into kept and removed.

use crate::candidate_pipeline::{PipelineCandidate, PipelineQuery};
use crate::util;
use std::any::{Any, type_name_of_val};
use tracing::{Span, field::Empty};
use xai_stats_receiver::global_stats_receiver;

Same boilerplate, plus two new things:

  • tracing::{Span, field::Empty}Empty is a tracing sentinel that means "this field exists but hasn't been recorded yet"; Span::current() gives a handle to the active span so we can fill those fields in later.
  • xai_stats_receiver::global_stats_receiver — a global handle to the stats backend. Used here for incr (counter) operations.
const KEPT_SCOPE: [(&str, &str); 1] = [("requests", "kept")];
const REMOVED_SCOPE: [(&str, &str); 1] = [("requests", "removed")];

These are tag arrays for the stats system. Every metric is keyed by a name + a list of (tag, value) pairs. So a Filter::kept metric will have tag requests=kept, and Filter::removed will have requests=removed. Defining them as const arrays of (&str, &str) keeps them allocation-free and reusable.

pub struct FilterResult<C> {
    pub kept: Vec<C>,
    pub removed: Vec<C>,
}

The result type. Two vectors: candidates that passed (kept) and candidates that failed (removed). Both are kept — the removed list isn't thrown away, it's tracked downstream so we can log why candidates were dropped (for debugging and training data).

Note: no enable or telemetry on this struct — it's a pure data carrier.

/// Filters run sequentially and partition candidates into kept and removed sets
pub trait Filter<Q, C>: Any + Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    /// Decide if this filter should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

Trait header is now familiar. The doc comment is important: filters run sequentially, not in parallel. That's the difference from hydrators. Why? Because each filter receives the kept output of the previous filter — so they're a pipeline, not a fan-out. You can also short-circuit by leaving a filter in early (e.g. drop blocked users before running an expensive feature lookup).

Note this trait is NOT #[async_trait]. Filters are synchronous. That makes them dramatically faster (no future state machine overhead) and easier to reason about. The trade-off is that you can't make an HTTP call from inside a filter. If you need that, do it in a hydrator first and stuff the result on the candidate, then filter against the candidate field.

    #[xai_stats_macro::receive_stats(latency=Bucket0To50)]
    #[tracing::instrument(skip_all, name = "filter", fields(
        name = self.name(),
        input_count = candidates.len(),
        kept_count = Empty,
        removed_count = Empty,
        filter_rate = Empty,
    ))]
    fn run(&self, query: &Q, candidates: Vec<C>) -> FilterResult<C> {

run is the entry point. The tracing::instrument here is more elaborate than on Source:

  • input_count = candidates.len() — recorded immediately (it's the length of the vec we just received).
  • kept_count, removed_count, filter_rate — declared as Empty so the span has slots for them, but they'll be filled in after the filter runs.

receive_stats(latency=Bucket0To50) says "histogram this function's latency into the 0–50ms bucket." Filters are expected to be very fast.

        let result = self.filter(query, candidates);
        let total = result.kept.len() + result.removed.len();
        let rate = if total > 0 {
            result.removed.len() as f64 / total as f64
        } else {
            0.0
        };
        let span = Span::current();
        span.record("kept_count", result.kept.len());
        span.record("removed_count", result.removed.len());
        span.record("filter_rate", format!("{:.3}", rate).as_str());
        self.stat(&result);
        result
    }

Body:

  1. Call the implementation hook self.filter(query, candidates).
  2. Compute the removal rate.
  3. Grab the current tracing span (the one created by the #[instrument] attribute) and fill in the previously-Empty fields.
  4. Call self.stat(&result) to emit the kept/removed counters.
  5. Return.

Two ways data flows out: the structured fields on the span (which a tracing-subscriber can dump as JSON for log search) and the counters (which go to a metrics backend like Prometheus). Both are kept up because dashboards want one shape and incident debugging wants the other.

    /// Filter candidates by evaluating each against some criteria.
    /// Returns a FilterResult containing kept candidates (which continue to the next stage)
    /// and removed candidates (which are excluded from further processing).
    fn filter(&self, query: &Q, candidates: Vec<C>) -> FilterResult<C>;

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }

The required implementation hook and the default name.

    fn stat(&self, result: &FilterResult<C>) {
        if let Some(receiver) = global_stats_receiver() {
            let metric_name = format!("{}.run", self.name());
            receiver.incr(metric_name.as_str(), &KEPT_SCOPE, result.kept.len() as u64);
            receiver.incr(
                metric_name.as_str(),
                &REMOVED_SCOPE,
                result.removed.len() as u64,
            );
        }
    }
}

The default stat impl:

  • If a global stats receiver is configured (returns Some), build a metric name like "AgeFilter.run" and emit two counter increments: one for kept count tagged requests=kept, one for removed count tagged requests=removed.
  • If no receiver is configured (None — e.g. in tests), silently no-op.

This means you can graph "rate of posts removed per filter, per request" without each filter needing to write its own metric code.


scorer.rs (65 lines) — the Scorer trait

A scorer assigns a numerical score to each candidate. They run sequentially (not parallel), each operating on the output of the previous one. This lets a later scorer override an earlier one (e.g. author-diversity scorer attenuates the Phoenix model score).

use crate::candidate_pipeline::{PipelineCandidate, PipelineQuery};
use crate::util;
use std::any::type_name_of_val;
use tonic::async_trait;
use tracing::warn;

/// Scorers update candidate fields (like a score field) and run sequentially
#[async_trait]
pub trait Scorer<Q, C>: Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    /// Decide if this scorer should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

Standard preamble. Note this is #[async_trait] (unlike Filter) — scorers can call out (e.g. the Phoenix scorer makes a model-inference RPC).

    #[xai_stats_macro::receive_stats]
    #[tracing::instrument(skip_all, name = "scorer", fields(name = self.name()))]
    async fn run(&self, query: &Q, candidates: &[C]) -> Vec<Result<C, String>> {
        let scored = self.score(query, candidates).await;
        let expected_len = candidates.len();
        if scored.len() == expected_len {
            scored
        } else {
            let message = format!(
                "Scorer length_mismatch expected={} got={}",
                expected_len,
                scored.len()
            );
            warn!(
                "Skipped: length_mismatch expected={} got={}",
                expected_len,
                scored.len()
            );
            vec![Err(message); expected_len]
        }
    }

run wraps score with a length guard. The return type is Vec<Result<C, String>> — one Result per input candidate. This is the contract for scorers: same number of outputs as inputs, in the same order.

If score violates that contract (returns a vector of the wrong length), we log a warning and return a vec of Errs of the correct length. We never crash, and we never let mismatched-length data flow downstream and risk corrupting the candidate-index alignment.

    /// Score candidates by performing async operations.
    /// Returns candidates with this scorer's fields populated.
    ///
    /// IMPORTANT: The returned vector must have the same candidates in the same order as the input.
    /// Dropping candidates in a hydrator is not allowed - use a filter stage instead.
    async fn score(&self, query: &Q, candidates: &[C]) -> Vec<Result<C, String>>;

The hook. The doc comment reinforces the contract — implementers must not drop candidates here. The "dropping is not allowed" comment mentions hydrator because the same constraint applies; this comment was probably copy-pasted but it's correct either way.

    /// Update a single candidate with the scored fields.
    /// Only the fields this scorer is responsible for should be copied.
    fn update(&self, candidate: &mut C, scored: C);

Same pattern as QueryHydrator::update: the scorer returns "scored" copies of candidates, and update is responsible for copying just the right fields onto the originals.

This split (return-by-value, merge-via-update) is the same idea repeated: it allows parallel/independent computation to happen on copies, then a serial merge to keep the canonical candidate consistent. Even for scorers (which run serially), the pattern is preserved for consistency with hydrators.

    /// Update all successfully scored candidates with the fields from `scored`.
    /// Default implementation iterates and calls `update` for each pair.
    fn update_all(&self, candidates: &mut [C], scored: Vec<Result<C, String>>) {
        for (candidate, scored) in candidates.iter_mut().zip(scored) {
            if let Ok(scored) = scored {
                self.update(candidate, scored);
            }
        }
    }

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

update_all is the bulk version. It zips two iterators (in-place candidates and the scored vec), and for each pair that scored successfully, calls update. Failed candidates are left untouched — they keep whatever previous score they had. This is graceful degradation: a model-inference failure on one row doesn't taint the whole batch.

Notice the choice: iter_mut().zip(scored). If scored is shorter than candidates, the zip stops short (the trailing candidates are left untouched). But we've already verified lengths in run via the length-mismatch check, so scored.len() == candidates.len() is guaranteed if we got here.


selector.rs (85 lines) — the Selector trait + SelectResult

A selector sorts candidates by some score and keeps the top K. There's exactly one selector per pipeline.

use crate::candidate_pipeline::{PipelineCandidate, PipelineQuery};
use crate::util;
use std::any::type_name_of_val;
use tracing::{Span, field::Empty};

pub struct SelectResult<C> {
    pub selected: Vec<C>,
    pub non_selected: Vec<C>,
}

The result struct mirrors FilterResult — selected vs. non-selected. Same reason: non-selected candidates aren't thrown away; they may still feed into side-effects (e.g. to log "we considered this but didn't show it" as a negative training signal).

impl<C> SelectResult<C> {
    pub fn len(&self) -> usize {
        self.selected.len()
    }

    pub fn is_empty(&self) -> bool {
        self.selected.is_empty() && self.non_selected.is_empty()
    }
}

Two helper methods. Note len() reports only selected.len() — i.e. how many made the cut — but is_empty() checks both are empty. So you can distinguish "we selected 0 from 10 candidates" (len=0, is_empty=false) from "there were no candidates to select from at all" (len=0, is_empty=true).

pub trait Selector<Q, C>: Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    /// Decide if this selector should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

Standard. No async_trait — selection is just sorting and truncation, no I/O.

    #[xai_stats_macro::receive_stats(latency=Bucket0To50, size=Bucket0To50)]
    #[tracing::instrument(skip_all, name = "selector", fields(
        name = self.name(),
        input_count = candidates.len(),
        selected_count = Empty,
        non_selected_count = Empty,
    ))]
    fn run(&self, query: &Q, candidates: Vec<C>) -> SelectResult<C> {
        let result = self.select(query, candidates);
        let span = Span::current();
        span.record("selected_count", result.selected.len());
        span.record("non_selected_count", result.non_selected.len());
        result
    }

run wraps select with tracing. Same pattern as Filter::run: record counts after the fact, the Empty placeholders get filled in. latency=Bucket0To50, size=Bucket0To50 — fast and bounded.

    // Returns (selected, non_selected).
    fn select(&self, _query: &Q, candidates: Vec<C>) -> SelectResult<C> {
        let mut sorted = self.sort(candidates);
        if let Some(limit) = self.size() {
            let non_selected = sorted.split_off(limit.min(sorted.len()));
            SelectResult {
                selected: sorted,
                non_selected,
            }
        } else {
            SelectResult {
                selected: sorted,
                non_selected: vec![],
            }
        }
    }

Now this is interesting — select has a default body. The default is "sort the candidates, then keep the first size() if a size is configured."

split_off(idx) is a Vec method that splits a vector at index idx, returning a new Vec containing elements from idx onwards. The original Vec keeps [0..idx]. So:

  • We sort the candidates (descending by score; see sort below).
  • We split at limit.min(sorted.len()) — i.e. clamp to the actual length so we don't panic on short inputs.
  • Everything before the split: selected (the top-K).
  • Everything after: non_selected.

If size() returns None, we keep everything. Default behaviour, no truncation.

Concrete pipelines can override select if they want custom logic (e.g. round-robin between sources, fairness constraints, …). The blender_selector in home-mixer/selectors/ is one such override.

    /// Extract the score from a candidate to use for sorting.
    fn score(&self, candidate: &C) -> f64;

    /// Sort candidates by their scores in descending order.
    fn sort(&self, candidates: Vec<C>) -> Vec<C> {
        let mut sorted = candidates;
        sorted.sort_by(|a, b| {
            self.score(b)
                .partial_cmp(&self.score(a))
                .unwrap_or(std::cmp::Ordering::Equal)
        });
        sorted
    }

The required hook is score(candidate) -> f64. Given a candidate, return its final score for sorting purposes.

The default sort uses Vec::sort_by with a comparator that:

  1. Takes two candidates a and b.
  2. Compares self.score(b) against self.score(a) — note the order: b.partial_cmp(&a), not the other way around, which gives descending order (highest score first).
  3. partial_cmp on f64 returns Option<Ordering> because of NaN. We fall back to Equal for NaN — i.e. NaN-scored candidates don't move (their relative order is preserved). This is stable w.r.t. NaN.

Cosmetic detail: let mut sorted = candidates; rebinds the vec mutably. Could just as well be let mut sorted: Vec<C> = candidates; or even let mut candidates = candidates;. The pattern of taking the vec by value and returning a sorted version is idiomatic — no in-place magic, no shared mutation.

    /// Optionally provide a size to select. Defaults to no truncation if not overridden.
    fn size(&self) -> Option<usize> {
        None
    }

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

Default size returns None (don't truncate). Concrete selectors override it with the desired K. name is the usual short-type-name pattern.


side_effect.rs (37 lines) — the SideEffect trait + SideEffectInput

Side effects fire after the pipeline has produced its result. They don't affect the response. Used for: logging served candidates to Kafka (for training), updating cache state, recording impressions, etc.

use crate::candidate_pipeline::{PipelineCandidate, PipelineQuery};
use crate::util;
use std::any::type_name_of_val;
use std::sync::Arc;
use tonic::async_trait;

// A side-effect is an action run that doesn't affect the pipeline result from being returned

#[derive(Clone)]
pub struct SideEffectInput<Q, C> {
    pub query: Arc<Q>,
    pub selected_candidates: Vec<C>,
    pub non_selected_candidates: Vec<C>,
}

The input struct carries everything a side-effect could possibly want:

  • The hydrated query, wrapped in Arc so multiple side-effects share without cloning.
  • The candidates that were ultimately selected.
  • The candidates that weren't (filtered + non-selected by selector).

#[derive(Clone)] because Tokio tasks may want their own owned SideEffectInput (though in practice it's used as Arc<SideEffectInput> — see the orchestrator). Clone is cheap because everything inside is Arc or Vec (vec clone is O(n), but typically n is small at this point).

#[async_trait]
pub trait SideEffect<Q, C>: Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    /// Decide if this side-effect should be run
    fn enable(&self, _query: Arc<Q>) -> bool {
        true
    }

#[async_trait] because side-effects do I/O (Kafka writes, cache writes). Note enable takes Arc<Q> here, not &Q — because side-effects are intended to be spawned into futures that may outlive the calling stack frame, so Arc ownership keeps the borrow checker happy when the closure that calls enable is moved into a Tokio task.

    #[xai_stats_macro::receive_stats]
    async fn run(&self, input: Arc<SideEffectInput<Q, C>>) -> Result<(), String> {
        self.side_effect(input).await
    }

    async fn side_effect(&self, input: Arc<SideEffectInput<Q, C>>) -> Result<(), String>;

run is a thin wrapper (just for the stats macro). side_effect is the hook. Both take Arc<SideEffectInput<Q, C>>, again so this can be cheaply moved across task boundaries.

Note there's no tracing::instrument here. Side-effects are fire-and-forget; if they panic or hang it shouldn't propagate. Their own internal logging is enough.

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

Same name pattern. End.


hydrator.rs (189 lines) — the Hydrator trait + CachedHydrator + blanket impl

The biggest of the per-stage files. Hydrators enrich candidates with extra fields by calling out to other services in parallel. There's also a CachedHydrator variant that wraps the hydration in a cache layer.

use crate::candidate_pipeline::{PipelineCandidate, PipelineQuery};
use crate::util;
use std::any::{Any, type_name_of_val};
use std::hash::Hash;
use tonic::async_trait;
use tracing::warn;
use xai_stats_receiver::global_stats_receiver;

Standard imports plus std::hash::Hash for the cache-key type bound (we'll see why below) and global_stats_receiver for cache hit/miss counters.

// Hydrators run in parallel and update candidate fields
#[async_trait]
pub trait Hydrator<Q, C>: Any + Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    /// Decide if this hydrator should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

The trait header. Doc comment notes: hydrators run in parallel — opposite of filters and scorers (sequential). That's because hydrators are independent: each one fills different fields on the candidate, so order doesn't matter. The orchestrator fans them all out, awaits all of them, then sequentially calls update_all to merge.

    /// Hydrate candidates by performing async operations.
    /// Returns candidates with this hydrator's fields populated.
    ///
    /// IMPORTANT: The returned vector must have the same candidates in the same order as the input.
    /// Dropping candidates in a hydrator is not allowed - use a filter stage instead.
    async fn hydrate(&self, query: &Q, candidates: &[C]) -> Vec<Result<C, String>>;

The hook. Same contract as scorers: vec-in, vec-out, same length, same order. Implementers fan out their batch RPC, collect results positionally, return.

    #[xai_stats_macro::receive_stats(latency=Bucket50To500, size=Bucket500To2500)]
    #[tracing::instrument(skip_all, name = "hydrator", fields(name = self.name()))]
    async fn run(&self, query: &Q, candidates: &[C]) -> Vec<Result<C, String>> {
        let hydrated = self.hydrate(query, candidates).await;
        let expected_len = candidates.len();
        if hydrated.len() == expected_len {
            hydrated
        } else {
            let message = format!(
                "Hydrator length_mismatch expected={} got={}",
                expected_len,
                hydrated.len()
            );
            warn!(
                "Skipped: length_mismatch expected={} got={}",
                expected_len,
                hydrated.len()
            );
            vec![Err(message); expected_len]
        }
    }

run wraps hydrate with the same length-mismatch guard we saw in Scorer::run. The stats macro buckets are different: latency 50–500ms (hydration is usually a network call so slower than filters/scorers' internal compute), size 500–2500 (a hydration call might process up to several thousand candidates).

    /// Update a single candidate with the hydrated fields.
    /// Only the fields this hydrator is responsible for should be copied.
    fn update(&self, candidate: &mut C, hydrated: C);

    /// Update all successfully hydrated candidates with the fields from `hydrated`.
    /// Default implementation iterates and calls `update` for each pair.
    fn update_all(&self, candidates: &mut [C], hydrated: Vec<Result<C, String>>) {
        for (candidate, hydrated) in candidates.iter_mut().zip(hydrated) {
            if let Ok(hydrated) = hydrated {
                self.update(candidate, hydrated);
            }
        }
    }

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

Same update/update_all/name triple we saw on Scorer. The merge pattern is identical — failed hydrations leave the candidate's existing fields intact.

So far we have a vanilla hydrator. Now the cached variant:

const CACHE_HIT_SCOPE: [(&str, &str); 1] = [("requests", "cache_hit")];
const CACHE_MISS_SCOPE: [(&str, &str); 1] = [("requests", "cache_miss")];

#[async_trait]
pub trait CacheStore<K, V>: Send + Sync {
    async fn get(&self, key: &K) -> Option<V>;
    async fn insert(&self, key: K, value: V);
}

Two more stats scope constants, and a CacheStore<K, V> trait. The cache trait is intentionally minimal: just get (returning Option, no error type — cache lookups never fail visibly, they just miss) and insert (which is also Result-free — writes are best-effort).

The trait is async to allow Redis-backed caches as well as in-memory ones.

#[async_trait]
pub trait CachedHydrator<Q, C>: Any + Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    type CacheKey: Eq + Hash + Send + Sync + 'static;
    type CacheValue: Clone + Send + Sync + 'static;

A CachedHydrator is a hydrator that has a cache layer in front of it. It has two associated types:

  • CacheKey — must be Eq + Hash (for hashmap lookup), Send + Sync + 'static (for async).
  • CacheValue — must be Clone (so cached results can be returned without consuming the entry), Send + Sync + 'static.
    fn enable(&self, _query: &Q) -> bool {
        true
    }

    fn cache_store(&self) -> &dyn CacheStore<Self::CacheKey, Self::CacheValue>;
    fn cache_key(&self, candidate: &C) -> Self::CacheKey;
    fn cache_value(&self, hydrated: &C) -> Self::CacheValue;

    fn hydrate_from_cache(&self, value: Self::CacheValue) -> C;
    async fn hydrate_from_client(&self, query: &Q, candidates: &[C]) -> Vec<Result<C, String>>;

    fn update(&self, candidate: &mut C, hydrated: C);

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }

Required methods:

  • cache_store() — returns a reference to the cache the hydrator uses.
  • cache_key(candidate) — derives a key from a candidate (e.g. candidate.tweet_id).
  • cache_value(hydrated) — extracts the to-be-cached value out of a fully-hydrated candidate.
  • hydrate_from_cache(value) — given a cached value, construct a partial hydrated candidate (with just this hydrator's fields populated).
  • hydrate_from_client(query, candidates) — the actual hydration logic, called only for candidates that missed the cache.
  • update(candidate, hydrated) — same merge pattern as before.

This is enough surface area to define caching once and have every cached hydrator benefit.

    fn stat_cache(&self, cache_hits: usize, cache_misses: usize) {
        if let Some(receiver) = global_stats_receiver() {
            let metric_name = format!("{}.cache", self.name());
            if cache_hits > 0 {
                receiver.incr(metric_name.as_str(), &CACHE_HIT_SCOPE, cache_hits as u64);
            }
            if cache_misses > 0 {
                receiver.incr(metric_name.as_str(), &CACHE_MISS_SCOPE, cache_misses as u64);
            }
        }
    }
}

Telemetry method: report hits/misses to the stats backend. Conditional if hits > 0 etc. — avoid emitting useless zeros. Metric naming: "FooCachedHydrator.cache" with requests=cache_hit or requests=cache_miss tag.

Now the cool part — the blanket impl:

#[async_trait]
impl<Q, C, T> Hydrator<Q, C> for T
where
    Q: PipelineQuery,
    C: PipelineCandidate,
    T: CachedHydrator<Q, C> + ?Sized,
{

For any T that implements CachedHydrator<Q, C>, we automatically implement Hydrator<Q, C>. That means concrete code only ever has to implement CachedHydrator — it gets the Hydrator for free, and the orchestrator (which only knows about Hydrator) can use it. ?Sized allows trait objects (dyn CachedHydrator) to also benefit.

    fn enable(&self, query: &Q) -> bool {
        CachedHydrator::enable(self, query)
    }

Disambiguate the call. Both Hydrator and CachedHydrator have enable. Without the explicit CachedHydrator::enable(self, query), the compiler would think we're calling the trait method we're currently implementing — infinite recursion. The fully-qualified form picks the CachedHydrator version.

    async fn hydrate(&self, query: &Q, candidates: &[C]) -> Vec<Result<C, String>> {
        let mut results = vec![None; candidates.len()];
        let mut missing_candidates = Vec::new();
        let mut missing_keys = Vec::new();
        let mut missing_indices = Vec::new();
        let mut cache_hits = 0usize;
        let mut cache_misses = 0usize;

The cached hydration logic.

  • results: Vec<Option<Result<C, String>>> of the right size, all None to start. We'll fill positions by index. The double-layered type means "this position is uncomputed" (None) vs "this position computed successfully" (Some(Ok)) vs "computed and failed" (Some(Err)).
  • missing_candidates / missing_keys / missing_indices — three parallel vecs tracking what we need to hydrate from the client (cache misses).
        for (index, candidate) in candidates.iter().enumerate() {
            let key = self.cache_key(candidate);
            match self.cache_store().get(&key).await {
                Some(value) => {
                    results[index] = Some(Ok(self.hydrate_from_cache(value)));
                    cache_hits += 1;
                }
                None => {
                    missing_candidates.push(candidate.clone());
                    missing_keys.push(key);
                    missing_indices.push(index);
                    cache_misses += 1;
                }
            }
        }

        self.stat_cache(cache_hits, cache_misses);

Pass 1: iterate candidates, look each up in the cache.

  • Cache hit: build a partial candidate from the cached value and store at results[index].
  • Cache miss: clone the candidate, push the cache key, remember the index, increment counter.

After the loop, emit cache hit/miss stats.

Subtle bit: cache lookups happen sequentially (for loop, .await per get). For an in-memory cache that's fine. For a Redis cache this could be slow — but in practice you'd batch via a custom impl. The default-blanket impl optimises for clarity, not throughput.

        if !missing_candidates.is_empty() {
            let hydrated_missing = self.hydrate_from_client(query, &missing_candidates).await;
            if hydrated_missing.len() != missing_candidates.len() {
                let message = format!(
                    "CachedHydrator length_mismatch expected={} got={}",
                    missing_candidates.len(),
                    hydrated_missing.len()
                );
                return vec![Err(message); candidates.len()];
            }

If we have any cache misses, call hydrate_from_client for them only. Then enforce the same length invariant. On mismatch, return all-errors for the entire input batch — defensive: don't return some hits and some errors.

            for ((index, key), hydrated) in missing_indices
                .into_iter()
                .zip(missing_keys.into_iter())
                .zip(hydrated_missing.into_iter())
            {
                if let Ok(ref hydrated_candidate) = hydrated {
                    let value = self.cache_value(hydrated_candidate);
                    self.cache_store().insert(key, value).await;
                }
                results[index] = Some(hydrated);
            }
        }

Pass 2: for each missing-candidate result:

  • Zip three iterators together: indices, keys, hydrated results. The nested .zip chain produces tuples ((index, key), hydrated).
  • If the hydration succeeded, extract the cache value and store it. (Note the ref keyword in if let Ok(ref hydrated_candidate) — we borrow rather than consume, so we can still move hydrated into results[index] afterwards.)
  • Either way, write the result into results[index]. Successful hydrations get cached; failed ones don't (good — failed hydration is usually transient, we don't want to cache the failure).

Cache writes are also sequential per-miss. Same observation as reads: an in-memory cache is fine; for Redis you'd batch.

        results
            .into_iter()
            .map(|result| {
                result.unwrap_or_else(|| Err("Missing hydration result for candidate".to_string()))
            })
            .collect()
    }

Final transformation: unwrap each Option<Result<C, String>> into Result<C, String>. If any position is still None (which would be a logic bug), substitute an error. Defensive again.

    fn update(&self, candidate: &mut C, hydrated: C) {
        CachedHydrator::update(self, candidate, hydrated);
    }
}

Same disambiguation as enable. End of impl.

So: the cached-hydrator blanket impl is a small lookup-loop + a single batched client call for misses + a write-back loop. ~60 lines of code, applied uniformly to every concrete cached hydrator. The whole hydrator-cache pattern is encapsulated here.


candidate_pipeline.rs (493 lines) — the orchestrator

The big one. Defines:

  • PipelineStage enum (names of the lifecycle stages)
  • PipelineComponents struct (for introspection)
  • PipelineResult struct (return value)
  • PipelineQuery trait (constraints on query types)
  • PipelineCandidate trait (constraints on candidate types)
  • CandidatePipeline trait (the master trait — define a concrete pipeline by impl'ing this) with a default execute() that runs every stage in the right order

We'll walk it top to bottom.

use crate::filter::Filter;
use crate::hydrator::Hydrator;
use crate::query_hydrator::QueryHydrator;
use crate::scorer::Scorer;
use crate::selector::SelectResult;
use crate::selector::Selector;
use crate::side_effect::{SideEffect, SideEffectInput};
use crate::source::Source;
use crate::util;
use futures::future::join_all;
use std::any::type_name_of_val;
use std::sync::Arc;
use std::time::Instant;
use tonic::async_trait;
use tracing::{Span, field::Empty, info};
use xai_stats_receiver::{HistogramBuckets, global_stats_receiver};

The import block pulls in:

  • Every stage trait from the sibling modules (Filter, Hydrator, QueryHydrator, Scorer, Selector + SelectResult, Source, SideEffect + SideEffectInput).
  • util (for short_type_name).
  • futures::future::join_all — the parallel-fanout primitive: takes an iterator of futures, returns a future that resolves when all are done.
  • Arc, Instant (for elapsed-time logging).
  • tonic::async_trait.
  • tracing bits.
  • Stats: HistogramBuckets (the bucket-enum type) and global_stats_receiver.
const FINAL_RESULT_SIZE_SCOPE: [(&str, &str); 1] = [("requests", "result_size")];
const FINAL_RESULT_EMPTY_SCOPE: [(&str, &str); 1] = [("requests", "result_empty")];

Two more stats-tag arrays, used at the very end of execute to record final result size and "did we return an empty response."

#[derive(Copy, Clone, Debug)]
pub enum PipelineStage {
    QueryHydrator,
    DependentQueryHydrator,
    Source,
    Hydrator,
    PostSelectionHydrator,
    Filter,
    PostSelectionFilter,
    Scorer,
    Selector,
    SideEffect,
}

The lifecycle stages. Notice there are ten stages, not the six we've covered. Three new ones:

  • DependentQueryHydrator: a second wave of query hydrators that runs after the first wave. Used when later hydrators depend on earlier ones — e.g. "fetch impression bloom filter" might need "user features" populated first.
  • PostSelectionHydrator: hydrators that run after selection (top-K). Why? Some hydration is expensive enough that you don't want to pay for candidates you'll throw away. So you hydrate cheap stuff first → score → select top-K → only then hydrate the expensive stuff (e.g. brand-safety classifier, video metadata).
  • PostSelectionFilter: filters that run after selection — final visibility checks (deleted? spam? gore?) that need the post-selection hydrated fields.

#[derive(Copy, Clone, Debug)]Copy because this is a unit-only enum, free to copy.

pub struct PipelineComponents {
    pub stage: PipelineStage,
    pub components: Vec<String>,
}

For pipeline introspection — given a concrete pipeline, you can ask "what components do you have at each stage?" Useful for service health endpoints / admin UIs.

pub struct PipelineResult<Q, C> {
    pub retrieved_candidates: Vec<C>,
    pub filtered_candidates: Vec<C>,
    pub selected_candidates: Vec<C>,
    pub query: Arc<Q>,
}

The result struct. Three vecs of candidates and the (hydrated) query:

  • retrieved_candidates: everything the sources returned, after hydration but before filtering.
  • filtered_candidates: everything that was filtered out (both pre-scoring and post-selection).
  • selected_candidates: the final survivors.
  • query: shared Arc so callers don't need to clone.

Notice these don't reflect order through the pipeline — you can't tell from this struct whether a candidate was filtered before or after scoring. That information is lost. Callers wanting it have to query the spans/metrics.

impl<Q: Default, C> PipelineResult<Q, C> {
    /// Create an empty result with a default query. Useful for short-circuiting
    /// requests (e.g. test users) without running the pipeline.
    pub fn empty() -> Self {
        Self {
            retrieved_candidates: vec![],
            filtered_candidates: vec![],
            selected_candidates: vec![],
            query: Arc::new(Q::default()),
        }
    }
}

Convenience constructor for an empty result — when you want to bail out early (e.g. user is on a do-not-recommend list) without running the pipeline at all. Requires Q: Default so we can synthesize a default query.

pub trait PipelineQuery: Clone + Send + Sync + 'static {
    fn params(&self) -> &xai_feature_switches::Params;
    fn decider(&self) -> Option<&xai_decider::Decider>;
}

The PipelineQuery trait. Any concrete query type must:

  • Be Clone so query hydrators can return owned copies.
  • Be Send + Sync + 'static for async usage.
  • Expose params() returning a reference to feature-switch parameters (compile-time-resolved server-side flags).
  • Expose decider() returning an optional reference to a Decider (an experiments / rollout decision client).

xai_feature_switches and xai_decider are external crates not included here. Conceptually:

  • params() lets every stage check feature flags (e.g. "is the OON scorer enabled for this request?").
  • decider() lets stages check experiment membership (e.g. "is this user in the new-ranking-formula treatment group?").

These are the two universal escape hatches every stage gets. Everything else lives on Q itself (the user ID, the follow list, etc.) and is concrete-pipeline-specific.

pub trait PipelineCandidate: Clone + Send + Sync + 'static {}
impl<T> PipelineCandidate for T where T: Clone + Send + Sync + 'static {}

PipelineCandidate is a marker trait — no methods, just the same auto-derive bounds (Clone + Send + Sync + 'static). The blanket impl says "anything that satisfies those bounds is a PipelineCandidate." So you can use any type as a candidate, no opt-in needed.

Why have the marker trait at all? Because it's more readable to write where C: PipelineCandidate in 20 places than to write the four-bound constraint each time. And it gives a clear extension point: if the future requires candidates to expose a unique ID method, we can add it here without changing every call site.

Now the main trait:

#[async_trait]
pub trait CandidatePipeline<Q, C>: Send + Sync
where
    Q: PipelineQuery,
    C: PipelineCandidate,
{
    fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
    fn dependent_query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>] {
        &[]
    }
    fn sources(&self) -> &[Box<dyn Source<Q, C>>];
    fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
    fn selector(&self) -> &dyn Selector<Q, C>;
    fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn post_selection_filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<Q, C>>>>;
    fn result_size(&self) -> usize;
    fn finalize(&self, _query: &Q, _candidates: &mut Vec<C>) {}

Slot definitions. To create a pipeline, you implement these methods and return your stage components:

  • query_hydrators(), dependent_query_hydrators(): slices of boxed trait objects. Default for the dependent ones is &[] (empty).
  • sources(), hydrators(), filters(), scorers(), post_selection_hydrators(), post_selection_filters(): slices.
  • selector(): a single trait object (not a slice).
  • side_effects(): returns Arc<Vec<…>> so it can be moved into a Tokio task without cloning.
  • result_size(): how many results to return.
  • finalize(query, candidates): optional final pass on selected candidates (e.g. shuffle, ad-injection, reorder). Default is a no-op.

Why Box<dyn …> everywhere? Because at this layer we don't know the concrete types of the stage components — only the trait. Boxing lets us put differently-typed stages in the same vec. Slices (&[…]) rather than vecs because the orchestrator only reads; the pipeline impl owns the vec.

Now execute():

    #[xai_stats_macro::receive_stats(latency=Bucket500To2500)]
    async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
        let hydrated_query = self.hydrate_query(query).await;
        let hydrated_query = self.hydrate_dependent_query(hydrated_query).await;

execute is the orchestrator entry point. The stats macro buckets latency 500–2500ms — the total wall-clock budget for a feed request.

First two lines: run query hydrators (wave 1), then dependent query hydrators (wave 2). Each one consumes and returns Q by value. The variable shadowing is fine — we replace hydrated_query with the further-hydrated version.

        let candidates = self.fetch_candidates(&hydrated_query).await;

        let hydrated_candidates = self.hydrate(&hydrated_query, candidates).await;

        let (kept_candidates, mut filtered_candidates) =
            self.filter(&hydrated_query, hydrated_candidates.clone());

        let scored_candidates = self.score(&hydrated_query, kept_candidates).await;

The candidate pipeline body:

  1. Fetch candidates from all enabled sources in parallel.
  2. Hydrate them in parallel.
  3. Filter them sequentially. We clone() the hydrated candidates because we want to keep the full hydrated list in retrieved_candidates for the result struct. kept_candidates continues; filtered_candidates is held aside for later return.
  4. Score them sequentially (each scorer updates the candidate, sees the previous scorer's output).
        let SelectResult {
            selected: selected_candidates,
            non_selected: mut non_selected_candidates,
        } = self.select(&hydrated_query, scored_candidates);
  1. Select top-K. Destructure the SelectResult into two named vecs.
        let post_selection_hydrated_candidates = self
            .hydrate_post_selection(&hydrated_query, selected_candidates)
            .await;

        let (mut final_candidates, post_selection_filtered_candidates) =
            self.filter_post_selection(&hydrated_query, post_selection_hydrated_candidates);
        filtered_candidates.extend(post_selection_filtered_candidates);
  1. Run the post-selection hydrators (expensive ones) only on the top-K.
  2. Run post-selection filters. Anything dropped here gets appended to filtered_candidates (so the result reflects "everything ever filtered out, no matter at what stage").

extend rather than push — it consumes the second vec and appends all elements.

        let truncated_candidates =
            final_candidates.split_off(self.result_size().min(final_candidates.len()));
        non_selected_candidates.extend(truncated_candidates);
  1. Truncate to the configured result size. Note this happens after post-selection filtering — so if post-selection filters removed some candidates, the response can be smaller than result_size(). We use split_off(min(size, len)) to be safe with short outputs (no panic).

The candidates we cut off go into non_selected_candidates (used as input to side-effects, so they can be logged as "we considered but didn't show").

        self.finalize(&hydrated_query, &mut final_candidates);

        self.stat_result_size(&final_candidates);
  1. Call finalize for any pipeline-specific final processing (default: no-op).
  2. Emit final-result-size histogram + zero-counter if empty.
        let arc_hydrated_query = Arc::new(hydrated_query);
        let input = Arc::new(SideEffectInput {
            query: arc_hydrated_query.clone(),
            selected_candidates: final_candidates.clone(),
            non_selected_candidates, // candidates are moved so we don't need to clone them
        });
        self.run_side_effects(input);
  1. Build the side-effect input. Arc the query (clones share the same underlying memory). Clone the selected candidates so we can return them while side-effects also get a copy. Move (not clone) non_selected_candidates — the comment is explicit about why.

run_side_effects spawns a Tokio task that fires them all in parallel without awaiting completion — see below.

        PipelineResult {
            retrieved_candidates: hydrated_candidates,
            filtered_candidates,
            selected_candidates: final_candidates,
            query: arc_hydrated_query,
        }
    }
  1. Return the result. Note this happens before the side-effects finish, by design — the response is sent to the client as soon as candidates are ready, and side-effects run in the background.

That's execute — about 50 lines, the entire feed-request lifecycle.

Now the helper methods:

    /// Return all configured components grouped by stage.
    fn components(&self) -> Vec<PipelineComponents> {
        fn stage<T: ?Sized>(
            stage: PipelineStage,
            items: &[Box<T>],
            name: impl Fn(&T) -> &str,
        ) -> PipelineComponents {
            PipelineComponents {
                stage,
                components: items
                    .iter()
                    .map(|item| name(item.as_ref()).to_string())
                    .collect(),
            }
        }

Local nested function stage(...) — packages "for each item in a slice, get its name, collect them" into a reusable closure-call. Note T: ?Sized so it works with dyn Trait types.

items.iter() iterates &Box<T>. name(item.as_ref()) calls the supplied name-extractor on &T. .to_string() clones to a String.

        vec![
            stage(PipelineStage::QueryHydrator, self.query_hydrators(), |h| {
                h.name()
            }),
            stage(
                PipelineStage::DependentQueryHydrator,
                self.dependent_query_hydrators(),
                |h| h.name(),
            ),
            stage(PipelineStage::Source, self.sources(), |s| s.name()),
            stage(PipelineStage::Hydrator, self.hydrators(), |h| h.name()),
            stage(PipelineStage::Filter, self.filters(), |f| f.name()),
            stage(PipelineStage::Scorer, self.scorers(), |s| s.name()),
            PipelineComponents {
                stage: PipelineStage::Selector,
                components: vec![self.selector().name().to_string()],
            },
            stage(
                PipelineStage::PostSelectionHydrator,
                self.post_selection_hydrators(),
                |h| h.name(),
            ),
            stage(
                PipelineStage::PostSelectionFilter,
                self.post_selection_filters(),
                |f| f.name(),
            ),
            stage(
                PipelineStage::SideEffect,
                self.side_effects().as_ref(),
                |s| s.name(),
            ),
        ]
    }

Build the vec of PipelineComponents, one per stage. The selector is the only single-value stage so we construct it inline; everything else goes through stage.

The closures |h| h.name() etc. look redundant — couldn't they just be T::name? They could, but using a closure here is more flexible (lets us shadow the name).

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }

The pipeline itself has a name, too (the short type name of its concrete type — e.g. "ForYouCandidatePipeline").

Now the per-stage runners:

    // -------------------------- Pipeline Execution --------------------------

    /// Run all query hydrators in parallel and merge results into the query.
    #[tracing::instrument(skip_all, name = "query_hydrators", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
    ))]
    async fn hydrate_query(&self, query: Q) -> Q {
        let start = Instant::now();
        let all = self.query_hydrators();
        Self::record_enabled_components(all.iter(), |h| h.enable(&query), |h| h.name());
        let hydrators: Vec<_> = all.iter().filter(|h| h.enable(&query)).collect();
        let hydrate_futures = hydrators.iter().map(|h| h.run(&query));
        let results = join_all(hydrate_futures).await;

        let mut hydrated_query = query;
        for (hydrator, result) in hydrators.iter().zip(results) {
            if let Ok(hydrated) = result {
                hydrator.update(&mut hydrated_query, hydrated);
            }
        }
        self.log_stage(start);
        hydrated_query
    }

The wave-1 query hydration loop:

  1. let start = Instant::now() — for elapsed-time logging.
  2. self.query_hydrators() to get the slice.
  3. record_enabled_components(...) records total_count, enabled_count, and (if non-empty) disabled field on the tracing span. This lets you see in logs exactly which hydrators were skipped per request.
  4. Filter to enabled hydrators.
  5. Build a vec of futures (one per hydrator's run call), each operating on &query.
  6. join_all(...).await — fan out, await all in parallel.
  7. Iterate enabled hydrators alongside their results. For each successful one, call hydrator.update(&mut hydrated_query, hydrated) to merge its fields back.
  8. log_stage(start) emits an info! with elapsed ms.
  9. Return the merged query.

This is the canonical "fan-out / sequential-merge" pattern. The merge has to be sequential because we're mutating hydrated_query, and we can't easily share &mut across futures. But the fan-out is cheap because &query is shared, so all hydrators can read the same query in parallel.

Side note: let hydrators: Vec<_> = all.iter().filter(...).collect() — collecting into a vec lets us iterate twice (once to build futures, once to zip with results). If we tried to do it in a single chain, we'd consume the iterator.

    /// Run dependent query hydrators in parallel and merge results into the query.
    ///
    /// This stage runs **after** [`hydrate_query`], so the incoming query
    /// already has all initial features populated.
    #[tracing::instrument(skip_all, name = "dependent_query_hydrators", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
    ))]
    async fn hydrate_dependent_query(&self, query: Q) -> Q {
        let all = self.dependent_query_hydrators();
        if all.is_empty() {
            return query;
        }
        let start = Instant::now();
        Self::record_enabled_components(all.iter(), |h| h.enable(&query), |h| h.name());
        let hydrators: Vec<_> = all.iter().filter(|h| h.enable(&query)).collect();
        let hydrate_futures = hydrators.iter().map(|h| h.run(&query));
        let results = join_all(hydrate_futures).await;

        let mut hydrated_query = query;
        for (hydrator, result) in hydrators.iter().zip(results) {
            if let Ok(hydrated) = result {
                hydrator.update(&mut hydrated_query, hydrated);
            }
        }
        self.log_stage(start);
        hydrated_query
    }

Wave-2 query hydration. Identical to wave 1 except for an early bail: if there are no dependent hydrators (the common case), return the query immediately without doing any work (or even starting the timer). This saves a few microseconds per request, multiplied by millions of requests = real money.

Code duplication w.r.t. wave 1 — refactoring would be possible but the methods are small and the duplication is local. Pragmatic.

    /// Run all candidate sources in parallel and collect results.
    #[tracing::instrument(skip_all, name = "sources", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
        candidate_count = Empty,
    ))]
    async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
        let start = Instant::now();
        let all = self.sources();
        Self::record_enabled_components(all.iter(), |s| s.enable(query), |s| s.name());
        let sources: Vec<_> = all.iter().filter(|s| s.enable(query)).collect();
        let source_futures = sources.iter().map(|s| s.run(query));
        let results = join_all(source_futures).await;

        let mut collected = Vec::new();
        for mut candidates in results.into_iter().flatten() {
            collected.append(&mut candidates);
        }
        Span::current().record("candidate_count", collected.len());
        self.log_stage_size(start, collected.len());
        collected
    }

Candidate sourcing. Same fan-out pattern, but the merge is "concatenate all vecs into one big vec":

  • results.into_iter() — iterate the Vec<Result<Vec<C>, String>> by value.
  • .flatten() — flattens Iterator<Item = Result<Vec<C>, …>> into Iterator<Item = Vec<C>> by dropping Err and unwrapping Ok. (For Result, flatten keeps Ok and discards Err.) So failed sources are silently dropped from the candidate pool — by design, partial failures should degrade gracefully.
  • For each Vec<C>, collected.append(&mut candidates) moves all elements (zero-copy) into collected.

Then we record the total candidate count on the span and log.

Concurrency cost note: there's no de-duplication here. If two sources return the same post, we'll have it twice. A later filter (DropDuplicatesFilter) handles that. Pushing dedup to a downstream filter rather than doing it here keeps the source stage stateless.

    /// Run all candidate hydrators in parallel and merge results into candidates.
    #[tracing::instrument(skip_all, name = "hydrators", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
    ))]
    async fn hydrate(&self, query: &Q, candidates: Vec<C>) -> Vec<C> {
        self.run_hydrators(query, candidates, self.hydrators(), PipelineStage::Hydrator)
            .await
    }

    /// Run post-selection candidate hydrators in parallel and merge results into candidates.
    #[tracing::instrument(skip_all, name = "post_selection_hydrators", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
    ))]
    async fn hydrate_post_selection(&self, query: &Q, candidates: Vec<C>) -> Vec<C> {
        self.run_hydrators(
            query,
            candidates,
            self.post_selection_hydrators(),
            PipelineStage::PostSelectionHydrator,
        )
        .await
    }

Two thin wrappers that delegate to run_hydrators with different parameters. The wrappers exist so each call gets its own tracing::instrument (with different span names — "hydrators" vs "post_selection_hydrators").

    /// Shared helper to hydrate with a provided hydrator list.
    async fn run_hydrators(
        &self,
        query: &Q,
        mut candidates: Vec<C>,
        hydrators: &[Box<dyn Hydrator<Q, C>>],
        _stage: PipelineStage,
    ) -> Vec<C> {
        let start = Instant::now();
        Self::record_enabled_components(hydrators.iter(), |h| h.enable(query), |h| h.name());
        let hydrators: Vec<_> = hydrators.iter().filter(|h| h.enable(query)).collect();
        let hydrate_futures = hydrators.iter().map(|h| h.run(query, &candidates));
        let results = join_all(hydrate_futures).await;
        for (hydrator, result) in hydrators.iter().zip(results) {
            hydrator.update_all(&mut candidates, result);
        }
        self.log_stage_size(start, candidates.len());
        candidates
    }

The shared hydrator runner. Same fan-out / sequential-merge pattern as the query hydrators, but now we're updating candidates via update_all instead of the query via update.

The _stage: PipelineStage parameter is unused (underscore prefix). It was probably intended for stage-specific logic or stats, but in the current code, the only difference between pre- and post-selection hydration is the span name from the wrapper, so it's vestigial. Or future-proofing.

Why mut candidates: Vec<C> (taking ownership) rather than &mut Vec<C>? Two reasons: (1) the caller doesn't need the original vec back, and (2) we return the vec out, so the consumer is moving ownership through.

    /// Run all filters sequentially. Each filter partitions candidates into kept and removed.
    #[tracing::instrument(skip_all, name = "filters", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
        input_count = candidates.len(),
        kept_count = Empty,
        removed_count = Empty,
        filter_rate = Empty,
    ))]
    fn filter(&self, query: &Q, candidates: Vec<C>) -> (Vec<C>, Vec<C>) {
        self.run_filters(query, candidates, self.filters(), PipelineStage::Filter)
    }

    /// Run post-scoring filters sequentially on already-scored candidates.
    #[tracing::instrument(skip_all, name = "post_selection_filters", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
        input_count = candidates.len(),
        kept_count = Empty,
        removed_count = Empty,
        filter_rate = Empty,
    ))]
    fn filter_post_selection(&self, query: &Q, candidates: Vec<C>) -> (Vec<C>, Vec<C>) {
        self.run_filters(
            query,
            candidates,
            self.post_selection_filters(),
            PipelineStage::PostSelectionFilter,
        )
    }

Same wrapping pattern for filters: two thin wrappers (pre / post) each with their own span, delegating to run_filters.

    // Shared helper to run filters sequentially from a provided filter list.
    fn run_filters(
        &self,
        query: &Q,
        mut candidates: Vec<C>,
        filters: &[Box<dyn Filter<Q, C>>],
        _stage: PipelineStage,
    ) -> (Vec<C>, Vec<C>) {
        Self::record_enabled_components(filters.iter(), |f| f.enable(query), |f| f.name());
        let mut all_removed = Vec::new();
        let mut removed_per_filter: Vec<(String, usize)> = Vec::new();
        for filter in filters.iter().filter(|f| f.enable(query)) {
            let result = filter.run(query, candidates);
            if !result.removed.is_empty() {
                removed_per_filter.push((filter.name().to_string(), result.removed.len()));
            }
            candidates = result.kept;
            all_removed.extend(result.removed);
        }
        let kept = candidates.len();
        let removed = all_removed.len();
        let total = kept + removed;
        let rate = if total > 0 {
            removed as f64 / total as f64
        } else {
            0.0
        };
        Span::current().record("kept_count", kept);
        Span::current().record("removed_count", removed);
        Span::current().record("filter_rate", format!("{:.3}", rate).as_str());
        self.log_filters(kept, removed, &removed_per_filter);
        (candidates, all_removed)
    }

The sequential filter runner.

  • all_removed accumulates every dropped candidate across all filters.
  • removed_per_filter records (filter_name, count) for any filter that actually dropped something (skipped if it dropped zero — keeps logs clean).
  • For each enabled filter, in order, run it; pull result.kept into candidates (becomes input to the next filter); push result.removed into all_removed.

After the loop, compute aggregate stats, record on span, log, return (kept, removed).

Returning a tuple here instead of a FilterResult — different shape than the per-filter return. Why? Because the orchestrator aggregates across filters into a flat kept/removed split, not the per-filter breakdown. Different concerns at different layers.

    /// Run all scorers sequentially and apply their results to candidates.
    #[tracing::instrument(skip_all, name = "scorers", fields(
        total_count = Empty,
        enabled_count = Empty,
        disabled = Empty,
    ))]
    async fn score(&self, query: &Q, mut candidates: Vec<C>) -> Vec<C> {
        let start = Instant::now();
        let all = self.scorers();
        Self::record_enabled_components(all.iter(), |s| s.enable(query), |s| s.name());
        for scorer in all.iter().filter(|s| s.enable(query)) {
            let scored = scorer.run(query, &candidates).await;
            scorer.update_all(&mut candidates, scored);
        }
        self.log_stage_size(start, candidates.len());
        candidates
    }

The scoring runner. Sequential. For each enabled scorer, in order:

  1. Call run(query, &candidates) — operates on a borrow, returns a Vec<Result<C, …>> of the same length.
  2. Call update_all(&mut candidates, scored) — merges results into the live candidates.

The reason we don't use join_all here is that the order matters — later scorers can see earlier scorers' results. The author-diversity scorer needs to know the Phoenix scorer's output to attenuate it.

    /// Select (sort/truncate) candidates using the configured selector
    fn select(&self, query: &Q, candidates: Vec<C>) -> SelectResult<C> {
        if self.selector().enable(query) {
            self.selector().run(query, candidates)
        } else {
            SelectResult {
                selected: candidates,
                non_selected: vec![],
            }
        }
    }

Selector runner. If the selector is enabled, run it. If not, pass all candidates through as "selected" without truncation. Simple.

    // Run all side effects in parallel
    fn run_side_effects(&self, input: Arc<SideEffectInput<Q, C>>) {
        let side_effects = self.side_effects();
        tokio::spawn(async move {
            let futures = side_effects
                .iter()
                .filter(|se| se.enable(input.query.clone()))
                .map(|se| se.run(input.clone()));
            let _ = join_all(futures).await;
        });
    }

Side effects! Two important things:

  1. tokio::spawn(async move { … }) — spawns a detached task. The _ = … discards the spawn handle. The orchestrator does not await side-effects; they run in the background. The pipeline response goes back to the client immediately.
  2. Within the spawned task, side-effects fan out via join_all — they run in parallel, since they're independent. (One side-effect writing to Kafka shouldn't block another updating Redis.)

The closure captures side_effects (an Arc<Vec<…>>) and input (an Arc<SideEffectInput>). Inside, for each enabled side-effect, we build a future. input.query.clone() clones the Arc<Q> (cheap — bumps refcount). input.clone() clones the Arc<SideEffectInput> (also cheap).

Note: self.side_effects() returns Arc<Vec<…>> from the trait. This is why — we need to move that vec into a spawned task, and Arc makes that cheap.

The trailing let _ = join_all(futures).await; discards the per-future results. If a side-effect fails, the failure is logged inside the side-effect impl (or by the receive_stats macro), but the orchestrator doesn't surface it.

    // -------------------------- Helpers --------------------------

    /// Iterate components, applying `is_enabled` to each, and record
    /// `total_count`, `enabled_count`, and (if any are disabled) the
    /// comma-joined names of disabled components on the current tracing span.
    fn record_enabled_components<'a, T: 'a>(
        items: impl Iterator<Item = &'a T>,
        is_enabled: impl Fn(&T) -> bool,
        get_name: impl Fn(&T) -> &str,
    ) {
        let mut total = 0usize;
        let mut disabled: Vec<&str> = Vec::new();
        for item in items {
            total += 1;
            if !is_enabled(item) {
                disabled.push(get_name(item));
            }
        }
        let span = Span::current();
        span.record("total_count", total);
        span.record("enabled_count", total - disabled.len());
        if !disabled.is_empty() {
            span.record("disabled", disabled.join(",").as_str());
        }
    }

The "record enabled components" helper, called at the top of every stage runner. Walks an iterator of components, counts the total, records the disabled ones' names. Records three fields on the current tracing span (which is the stage's span, since this is called inside #[instrument]-decorated functions).

Note impl Iterator<Item = &'a T> and impl Fn(&T) -> … for the parameters — anonymous types via impl Trait syntax, no boxing required, monomorphised per call site.

    // -------------------------- Logging and Stats --------------------------

    fn log_stage(&self, start: Instant) {
        info!("latency_ms={}", start.elapsed().as_millis());
    }

    fn log_stage_size(&self, start: Instant, size: usize) {
        info!("latency_ms={} size={}", start.elapsed().as_millis(), size);
    }

    fn log_filters(&self, kept: usize, removed: usize, removed_per_filter: &[(String, usize)]) {
        let removed_summary = removed_per_filter
            .iter()
            .map(|(name, removed)| format!("{}={}", name, removed))
            .collect::<Vec<_>>()
            .join(",");
        info!(
            "kept {}, removed {} removed_per_filter [{}]",
            kept, removed, removed_summary,
        );
    }

Three log helpers:

  • log_stage: latency_ms=…
  • log_stage_size: latency_ms=… size=…
  • log_filters: kept …, removed … removed_per_filter [name1=count1,name2=count2,…]

These are info! lines that go into the currently active tracing span — so they're correctly attributed to the stage (and ultimately the request). The removed_summary is built by formatting each (name, count) pair as name=count and joining with commas. So a log line looks like:

kept 142, removed 38 removed_per_filter [AgeFilter=12,SelfTweetFilter=4,PreviouslySeenPostsFilter=22]

Easy to grep.

    fn stat_result_size(&self, final_candidates: &[C]) {
        if let Some(receiver) = global_stats_receiver() {
            let response_size = final_candidates.len();
            let metric_name = format!("{}.execute", self.name());
            receiver.observe(
                metric_name.as_str(),
                &FINAL_RESULT_SIZE_SCOPE,
                response_size as f64,
                HistogramBuckets::Bucket0To50,
            );
            if response_size == 0 {
                receiver.incr(metric_name.as_str(), &FINAL_RESULT_EMPTY_SCOPE, 1u64);
            }
        }
    }
}

Last helper. Emits two metrics at the very end of execute:

  • A histogram "<pipeline-name>.execute" with requests=result_size tag and value = final response size, bucketed 0–50.
  • A counter "<pipeline-name>.execute" with requests=result_empty incremented by 1, but only if the response was empty. So "empty response rate" is just the empty counter divided by the total request count.

This is the last block of the trait. End of file.


Part 3 — What we've learned

After 1,031 lines, we have a complete picture of the orchestration framework. The big mental model:

Every stage type is a trait with a default run() that wraps a required hook (source() / hydrate() / filter() / score() / select() / side_effect()). The wrapper does:

  • Span instrumentation (tracing)
  • Metric emission (stats)
  • Length-invariant checks (for hydrators and scorers)
  • Error logging
  • Gating via enable()

The CandidatePipeline trait pulls them all together with a default execute() that runs:

query hydrators (||)
↓
dependent query hydrators (||)
↓
sources (||) → flatten → all candidates
↓
hydrators (||) → merge → hydrated candidates
↓
filters (sequential) → kept, removed
↓
scorers (sequential) → scored
↓
selector → selected, non_selected
↓
post-selection hydrators (||) → hydrated
↓
post-selection filters (sequential) → final, more_removed
↓
truncate to result_size
↓
finalize()
↓
return PipelineResult
↓
(in background) side_effects (||)

Parallel vs. sequential:

  • Parallel: query hydrators, sources, hydrators, post-selection hydrators, side-effects.
  • Sequential: filters, scorers, post-selection filters.

The split is based on whether order matters / whether each component depends on the previous component's output.

The merge pattern (hydrate-then-update): every parallel stage produces a copy with just its own fields filled in, then a serial merge step calls update to overlay just the right fields onto the canonical query/candidate. This lets all hydrators read the same shared input without locks, while keeping the canonical query/candidate consistent.

Length invariants: hydrators and scorers must return exactly as many results as they got inputs, in the same order. Violations are caught at the run boundary and converted to all-errors. This makes index-based merging safe.

Failure modes: failed sources are silently dropped; failed hydrators/scorers leave the affected candidate unchanged (graceful degradation); failed filters can't fail (no async, no I/O); failed side-effects are silent.

This is a remarkably elegant design. Roughly 1,000 lines of trait-and-default-impl scaffolding lets every concrete pipeline (we'll see one in session 4) be defined by just listing its stage components.


Next session

Session 02 — Thunder I: entry points and post store. We'll read thunder/lib.rs, main.rs, deserializer.rs, kafka_utils.rs, and the posts/ module (post_store.rs, mod.rs). Thunder is the in-memory post index that serves in-network candidates. About 676 LOC.

After that, Session 03 finishes Thunder with the Kafka listeners and the gRPC service.