X For You algorithm, line by line · Part 4

X For You algorithm, line by line — Part 4: Home-Mixer core + models

Part 4 of the deep dive into xai-org/x-algorithm. Home-Mixer's binary entry point, QueryBuilder, gRPC service implementations (Scored Posts + For You + URT variant), and the type system — the 50-field ScoredPostsQuery and 30-field PostCandidate plus brand safety verdict computation.

May 15, 2026·43 min read

We're now leaving Thunder. From here on, almost everything we read sits inside home-mixer/ — the orchestration service that actually produces a user's For-You feed. It's the largest module in the repo at 11,695 LOC; this session covers its shell (the binary, the gRPC server, the two service impls) and its type system (the ScoredPostsQuery and PostCandidate structs plus supporting types).

After this session, the next ten or so sessions will fill in the behavior — filters, hydrators, scorers, sources, side-effects, ads. Everything those plug into is defined here.

Files covered (1,551 LOC across 12 files):

home-mixer/
├── lib.rs                          (17)    module roots + public re-exports
├── main.rs                         (63)    binary entry point
├── server.rs                       (421)   HomeMixerServer + QueryBuilder + gRPC service impls
├── for_you_server.rs               (87)    For You feed surface
├── scored_posts_server.rs          (215)   Scored Posts surface + debug JSON
└── models/
    ├── mod.rs                      (7)
    ├── query.rs                    (248)   ScoredPostsQuery
    ├── candidate.rs                (153)   PostCandidate + CandidateHelpers
    ├── candidate_features.rs       (146)   BlockedByUserIds, FilteredTopicsByExperiment
    ├── user_features.rs            (80)    UserFeatures + Thrift codec
    ├── brand_safety.rs             (91)    BrandSafetyVerdict computation
    └── in_network_reply.rs         (23)    InNetworkReply (OnceLock-deferred)

The orchestration verbs (execute, hydrate_query, filter, …) live in the candidate-pipeline crate that we read in Session 01. What we're reading now is how home-mixer plugs into that framework: what Q and C are, what request handlers look like, how the binary starts up.


lib.rs (17 lines)

pub mod ads;
mod candidate_hydrators;
pub mod candidate_pipeline;
mod filters;
mod for_you_server;
pub mod models;
mod query_hydrators;
mod scored_posts_server;
pub mod scorers;
mod selectors;
pub mod server;
mod side_effects;
mod sources;

pub use for_you_server::ForYouFeedServer;
pub use scored_posts_server::ScoredPostsServer;
pub use server::{HomeMixerConfig, HomeMixerServer};

Crate root. Two important things to notice:

  1. Visibility split. ads, candidate_pipeline, models, scorers, server are pub mod (consumed by outside callers — tests, dark-traffic glue, integration). filters, query_hydrators, candidate_hydrators, side_effects, sources, selectors, for_you_server, scored_posts_server are private mod — internal building blocks that only the pipeline declarations reference.
  2. Re-exports. The three public types ForYouFeedServer, ScoredPostsServer, HomeMixerConfig, HomeMixerServer get hoisted to crate root so external callers can use xai_home_mixer::HomeMixerServer instead of digging through submodules.

for_you_server is private but ForYouFeedServer is pub use-exported. This is the standard "implementation in nested module, public surface flat at the root" pattern.


main.rs (63 lines)

Thin glue: parse CLI args, build a service-context, hand it to a framework called XServiceBuilder, which does everything else.

use std::time::Duration;

use clap::Parser;
use xai_dark_traffic::RejectDarkTrafficLayer;
use xai_home_mixer::dark_traffic_setup;
use xai_home_mixer::params;
use xai_home_mixer::{HomeMixerConfig, HomeMixerServer};
use xai_home_mixer_proto as pb;
use xai_x_rpc::grpc_client::TlsMode;
use xai_x_rpc::wily_lookup_service::ShardCoordinate;
use xai_x_service_builder::XServiceBuilder;

Imports. Several interesting ones:

  • xai_dark_traffic::RejectDarkTrafficLayer — a tower middleware layer that intercepts shadow / dark-traffic requests. "Dark traffic" is duplicated production traffic sent to a new code path for A/B comparison without affecting real users; this layer rejects them under certain conditions.
  • xai_x_rpc::wily_lookup_service::ShardCoordinate — Wily is the internal service-discovery system; ShardCoordinate tells this process which shard it's running as (e.g. {ordinal: 3, total_size: 500} means "I am shard 3 of 500").
  • XServiceBuilder — internal framework that bundles cross-cutting concerns: TLS, metrics, OTEL, feature switches, deciders, gRPC reflection, profiling endpoints.
#[derive(Parser, Debug)]
#[command(about = "HomeMixer gRPC Server")]
struct Args {
    #[arg(long, default_value_t = 50051u16)]
    grpc_port: u16,
    #[arg(long, default_value_t = 9090u16)]
    metrics_port: u16,
    #[arg(long, default_value_t = -1)]
    shard_coordinate: i16,
    #[arg(long, default_value_t = 500)]
    shard_total_size: u16,
    #[arg(long, default_value = "atla")]
    datacenter: String,
    #[arg(long, default_value = "")]
    otel_endpoint: String,
}

CLI args. grpc_port 50051 is gRPC convention. metrics_port 9090 is Prometheus convention. shard_coordinate=-1 is the "unsharded / not assigned yet" sentinel — converted to Option<ShardCoordinate> below. datacenter="atla" defaults to Atlanta (X's primary datacenter).

fn parse_shard(args: &Args) -> Option<ShardCoordinate> {
    if args.shard_coordinate >= 0 {
        Some(ShardCoordinate {
            ordinal: args.shard_coordinate as u16,
            total_size: args.shard_total_size,
        })
    } else {
        None
    }
}

Convert i16 + u16 into Option<ShardCoordinate>. The -1 sentinel returns None (you can run home-mixer without sharding; some queries route to a single instance).

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let args = Args::parse();
    let shard_coordinate = parse_shard(&args);

    xai_stringcenter::init_from_file(params::STRINGCENTER_BUNDLE_PATH);

    XServiceBuilder::new("home-mixer")
        .grpc_port(args.grpc_port)
        .metrics_port(args.metrics_port)
        .datacenter(args.datacenter)
        .otel_endpoint(args.otel_endpoint)
        .with_featureswitches(params::FS_PATH, true)
        .with_decider(params::decider_path(), None)
        .with_tls(TlsMode::server_mtls_from_env()?)
        .with_max_connection_age(Duration::from_secs(300))
        .with_reflection(pb::FILE_DESCRIPTOR_SET)
        .with_layer(dark_traffic_setup::resolve_layer())
        .with_layer(RejectDarkTrafficLayer::from_env())
        .http_routes(xai_profiling::profiling_router())
        .run::<HomeMixerServer>(HomeMixerConfig { shard_coordinate })
        .await
}

#[tokio::main] macro turns this into a multi-threaded async runtime.

The flow:

  1. Parse CLI args.
  2. Compute shard coordinate.
  3. Initialize StringCenterxai_stringcenter is X's internal localization library; it loads a translations bundle once at startup so subsequent code can look up translations cheaply. The home-mixer doesn't display strings directly but for_you_feed_urt returns URT (Twitter's UI-rendering protocol) responses which embed strings.
  4. Build the service, a long fluent chain:
    • grpc_port, metrics_port, datacenter, otel_endpoint — wire from args.
    • with_featureswitches(params::FS_PATH, true) — load feature-switch definitions from disk. The true enables hot reload (watch the file).
    • with_decider(params::decider_path(), None) — load deciders (experiment/rollout flags).
    • with_tls(TlsMode::server_mtls_from_env()?) — server-side mTLS (mutual TLS). Cert and key come from env vars.
    • with_max_connection_age(Duration::from_secs(300)) — close gRPC connections after 5 min, forcing client reconnects. Useful for load balancing: long-lived connections "stick" to one instance, max-age forces rotation.
    • with_reflection(pb::FILE_DESCRIPTOR_SET) — enable gRPC reflection (so grpcurl and similar tools can introspect available methods).
    • Two with_layer calls — tower middleware for dark traffic. One sets up dark-traffic routing, the other rejects it under threshold.
    • http_routes(xai_profiling::profiling_router()) — mount pprof / async-profiler endpoints on the HTTP server (separate from gRPC).
    • run::<HomeMixerServer>(HomeMixerConfig { shard_coordinate }) — this is the magic. It calls HomeMixerServer::build(...) (we'll see this in server.rs) to construct the server, then runs the gRPC + HTTP listener.

Total binary entry: ~25 substantive lines of work. The whole "build a production gRPC service" boilerplate has been pushed into XServiceBuilder.


server.rs (421 lines) — the heart

The largest file we'll read this session. It defines:

  1. RequestContext — the per-request bundle: trace info, hydrated query, root span.
  2. PipelineOutput — internal output struct.
  3. HomeMixerConfig — the build-time config (just shard_coordinate).
  4. QueryBuilder — turns the incoming proto request into a typed ScoredPostsQuery.
  5. HomeMixerServer — holds the two service handlers as Arcs.
  6. Three tonic::async_trait impls — ScoredPostsService, ForYouFeedService, and XService (the framework adapter).

Walking it top to bottom.

Imports

use crate::candidate_pipeline::for_you_candidate_pipeline::ForYouCandidatePipeline;
use crate::candidate_pipeline::phoenix_candidate_pipeline::PhoenixCandidatePipeline;
use crate::clients::gizmoduck_client::{
    GizmoduckClient, MockGizmoduckClient, ProdGizmoduckClient, ViewerData,
};
use crate::for_you_server::ForYouFeedServer;
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::params;
use crate::scored_posts_server::{ScoredPostsServer, build_debug_json};
  • The two pipelines: PhoenixCandidatePipeline (scored-posts) and ForYouCandidatePipeline (the For You feed that wraps it).
  • GizmoduckClient and friends. Gizmoduck is the historical X user-service. The trait abstracts mock vs. prod so tests can swap in a MockGizmoduckClient. ViewerData is the bundle returned by Gizmoduck for the request's viewer.
  • Models: PostCandidate (the C) and ScoredPostsQuery (the Q).
  • build_debug_json — produces a JSON dump of the pipeline result for debug endpoints.

Note: crate::clients::gizmoduck_client is imported but the lib.rs we just read doesn't expose a clients module. The clients module is pub mod somewhere not shown in this release — another visible artifact of partial open-sourcing.

use std::collections::HashMap;
use std::sync::Arc;
use tonic::codec::CompressionEncoding;
use tonic::{Request, Response, Status};
use tracing::{Instrument, info_span};
use xai_candidate_pipeline::candidate_pipeline::PipelineResult;
use xai_candidate_pipeline::component_library::utils::{
    days_since_creation, generate_request_id, is_sampled, non_zero, resolve_request_id,
};
use xai_decider::{Decider, DeciderStore};
use xai_feature_switches::{FeatureSwitches, RecipientBuilder};
use xai_home_mixer_proto as pb;
use xai_home_mixer_proto::{
    DebugScoredPostsResponse, ForYouFeedResponse, ForYouFeedUrtResponse, ScoredPost,
    ScoredPostsResponse,
};
use xai_pipeline_tracing::{B3RequestInfo, extract_b3_info};
use xai_recsys_proto::{network_type_string_to_enum, timezone_string_to_enum};
use xai_urt_thrift::cursor_utils;
use xai_urt_thrift::operation::CursorType;
use xai_x_rpc::wily_lookup_service::ShardCoordinate;

Notables:

  • xai_candidate_pipeline::component_library::utils::* — shared helpers from the candidate-pipeline crate's "component library" (a sister set of pre-built components not in the open release).
  • xai_decider::{Decider, DeciderStore} — A/B testing client.
  • xai_feature_switches::{FeatureSwitches, RecipientBuilder} — feature-flag client. A Recipient is "the entity being evaluated" — user ID + country + language + custom keys. match_recipient returns the active feature values for that recipient.
  • xai_pipeline_tracing::{B3RequestInfo, extract_b3_info} — B3 is the Zipkin/Brave trace propagation header set. B3RequestInfo parses them out of gRPC metadata so the request joins the distributed trace.
  • xai_urt_thrift — URT is X's "Unified Rendering Tree" — a frontend rendering format (think: timeline JSON with cards, modules, separators). cursor_utils decodes pagination cursors. We see this in for_you_feed_urt.
const VIEWER_ROLES_TIMEOUT_MS: u64 = 200;

A 200ms timeout on the Gizmoduck call. If Gizmoduck is slow, we proceed with default ViewerData — we don't block the request on viewer metadata.

Types

pub struct RequestContext {
    pub b3_info: B3RequestInfo,
    pub query: ScoredPostsQuery,
    pub root_span: tracing::Span,
}

pub(crate) struct PipelineOutput {
    pub scored_posts: Vec<ScoredPost>,
    pub pipeline_result: PipelineResult<ScoredPostsQuery, PostCandidate>,
}

pub struct HomeMixerConfig {
    pub shard_coordinate: Option<ShardCoordinate>,
}

Three structs. RequestContext is what QueryBuilder::build returns. PipelineOutput packages both the wire-format ScoredPosts and the full PipelineResult (for debug). HomeMixerConfig is the build-time config we saw in main.rs.

QueryBuilder

#[derive(Clone)]
pub struct QueryBuilder {
    feature_switches: Arc<FeatureSwitches>,
    decider: Decider,
    datacenter: String,
    gizmoduck_client: Arc<dyn GizmoduckClient + Send + Sync>,
}

The "convert proto request to internal query" helper. Holds the four things needed: feature-switch evaluator, decider client, datacenter name (used as a key in feature-switch evaluation), and the Gizmoduck client (as Arc<dyn …> so it can be swapped in tests).

#[derive(Clone)] — cloning bumps Arc refcounts only. We'll see two Arcs of this (one for each server).

impl QueryBuilder {
    pub async fn build(
        &self,
        mut b3_info: B3RequestInfo,
        proto_query: pb::ScoredPostsQuery,
        fs_overrides: std::collections::HashMap<String, String>,
        span_name: &'static str,
    ) -> Result<RequestContext, Status> {
        if proto_query.viewer_id == 0 {
            return Err(Status::invalid_argument("viewer_id must be specified"));
        }
        if params::TRACE_USER_IDS.contains(&proto_query.viewer_id) {
            b3_info.force_sample();
        }

Validation: viewer_id is mandatory. Force-sampling: for a known list of trace users (engineers' accounts), force B3 sampling on so we see every request from these users in the trace UI.

        let viewer_data = self.fetch_viewer_data(proto_query.viewer_id).await;

        let in_network_only =
            proto_query.in_network_only || viewer_data.allow_for_you_recommendations == Some(false);

Fetch viewer data (or get defaults if Gizmoduck timed out). Then resolve in_network_only: honour either the request flag or the user's account preference (a "don't recommend OON content to me" toggle).

        let params = self.evaluate_feature_switches(
            &proto_query,
            &viewer_data.roles,
            viewer_data.has_phone_number,
            &fs_overrides,
        );

Compute the feature-switch Params once per request. This is the snapshot of every feature flag value that applies to this request. We pass it through the query so every pipeline stage sees the same values (consistent within a request).

        let push_to_home_post_id = non_zero(proto_query.push_to_home_post_id);
        let device_status = proto_query.device_status.unwrap_or_default();
        let prediction_id = generate_request_id();
        let request_id = resolve_request_id(proto_query.request_id);

A few utility calls:

  • non_zeroOption<u64> from a u64, treating 0 as None. Common proto idiom (proto3 doesn't have field presence for scalars, so 0 is the sentinel).
  • generate_request_id() — fresh u64.
  • resolve_request_id(proto.request_id) — use the client-supplied ID if non-zero, else generate one.

So prediction_id and request_id are different IDs:

  • request_id is the request identifier — propagated end-to-end, set by the upstream caller.
  • prediction_id is the prediction identifier — set per-pipeline-execution. Used as a join key for joining the served-feed log against the engagement-log later for training data.
        let query = ScoredPostsQuery::new(
            proto_query.viewer_id,
            proto_query.client_app_id,
            proto_query.country_code,
            proto_query.language_code,
            proto_query.seen_ids,
            proto_query.served_ids,
            in_network_only,
            proto_query.is_bottom_request,
            params,
            self.decider.with_recipient(proto_query.viewer_id),
            viewer_data.roles,
            viewer_data.muted_keywords,
            viewer_data.follower_count,
            proto_query.topic_ids,
            proto_query.excluded_topic_ids,
            proto_query.exclude_videos,
            request_id,
            prediction_id,
            device_status.ip_address,
            device_status.user_agent,
            timezone_string_to_enum(device_status.time_zone.as_ref()),
            network_type_string_to_enum(device_status.device_network_type.as_ref()),
            device_status.client_version,
            device_status.device_id,
            device_status.mobile_device_id,
            device_status.mobile_device_ad_id,
            viewer_data.subscription_level,
            is_sampled(request_id, 0.5),
            proto_query.is_preview,
            viewer_data.age_in_years,
            push_to_home_post_id,
        );

The ScoredPostsQuery::new constructor takes a thirty-one argument signature. The query is the largest struct in the codebase — every field has to get plugged in here.

A few interesting derivations:

  • self.decider.with_recipient(proto_query.viewer_id)Decider is cheap to clone; with_recipient returns a per-user decider that all subsequent .decide("flag") calls will evaluate against this user.
  • timezone_string_to_enum, network_type_string_to_enum — proto enum lookups from strings.
  • is_sampled(request_id, 0.5) — deterministic 50% sample (hash(request_id) & 1). Used for is_shadow_traffic. Shadow-traffic-tagged requests run extra side effects (full logging) for ML training data sampling.
        let root_span = b3_info.root_span(info_span!(
            "request",
            endpoint = span_name,
            trace = %b3_info.trace_id_str,
            user = %query.user_id,
            b3 = %b3_info.b3_sampled,
        ));

        Ok(RequestContext {
            b3_info,
            query,
            root_span,
        })
    }

Create a root tracing span tagged with the endpoint name, trace ID, user ID, and B3 sampling decision. % is tracing's Display-format flag — emits the value via its Display impl.

Return the bundle.

evaluate_feature_switches

    fn evaluate_feature_switches(
        &self,
        proto_query: &pb::ScoredPostsQuery,
        user_roles: &[String],
        has_phone_number: bool,
        fs_overrides: &std::collections::HashMap<String, String>,
    ) -> xai_feature_switches::Params {
        let recipient = RecipientBuilder::new()
            .user_id(proto_query.viewer_id)
            .country(&proto_query.country_code)
            .language(&proto_query.language_code)
            .client_app_id(proto_query.client_app_id as i64)
            .custom_string("datacenter", &self.datacenter)
            .custom_i64(
                "account_age_days",
                days_since_creation(proto_query.viewer_id),
            )
            .custom_bool("has_phone_number", has_phone_number);
        let recipient = if !user_roles.is_empty() {
            recipient.user_roles(user_roles.iter().cloned())
        } else {
            recipient
        };
        let mut results = self.feature_switches.match_recipient(&recipient.build());

Build a Recipient — the input to feature-switch evaluation. The standard keys: user, country, language, client app. The custom keys: datacenter, account age (from days_since_creation — which presumably extracts the timestamp portion of the snowflake user ID), phone-number presence.

days_since_creation(viewer_id) is interesting — it computes age from the user ID alone. X's user IDs are time-based snowflake IDs, so the timestamp is embedded.

User roles are added conditionally. If empty, skip the .user_roles(...) call to avoid building an empty iterator.

match_recipient(...) evaluates every feature-switch against this recipient and returns the resulting Params snapshot.

        if !fs_overrides.is_empty() {
            for (key, value) in fs_overrides {
                results.override_fs(key.clone(), value);
            }
            tracing::info!(
                "Applied {} FS overrides: {:?}",
                fs_overrides.len(),
                fs_overrides.keys().collect::<Vec<_>>()
            );
        }

        results.into()
    }

Apply any per-request overrides on top of the computed params. The debug endpoint (get_debug_scored_posts) supports this — an internal tool can pass feature_switch_overrides: {"fs_xyz": "value"} to test a specific flag value without changing the global config.

Log if overrides were applied — for audit.

fetch_viewer_data

    async fn fetch_viewer_data(&self, viewer_id: u64) -> ViewerData {
        match tokio::time::timeout(
            std::time::Duration::from_millis(VIEWER_ROLES_TIMEOUT_MS),
            self.gizmoduck_client.get_viewer_data(viewer_id),
        )
        .await
        {
            Ok(Ok(data)) => data,
            Ok(Err(_)) | Err(_) => ViewerData::default(),
        }
    }

A bounded fetch. tokio::time::timeout(200ms, future) returns:

  • Ok(Ok(data)) — fetched successfully within 200ms.
  • Ok(Err(_)) — fetched within 200ms but the client returned an error.
  • Err(_) — timeout fired first.

Both error paths return ViewerData::default() (empty roles, no muted keywords, etc.). Critical for resilience: if Gizmoduck is down, the feed still serves; some personalization is missing but the user still gets a feed.

The pattern Ok(Err(_)) | Err(_) collapses both error branches into one arm.

mock

    pub fn mock() -> Self {
        Self {
            feature_switches: Arc::new(FeatureSwitches::new(vec![]).unwrap()),
            decider: Decider::new(DeciderStore::new(HashMap::new())),
            datacenter: "mock".to_string(),
            gizmoduck_client: Arc::new(MockGizmoduckClient::default()),
        }
    }
}

Test factory: empty feature-switches, empty decider store, "mock" datacenter, mock Gizmoduck. The .unwrap() on FeatureSwitches::new(vec![]) would only fail in some weird config case.

HomeMixerServer

pub struct HomeMixerServer {
    scored_posts: Arc<ScoredPostsServer>,
    for_you: Arc<ForYouFeedServer>,
}

The top-level struct. Just two Arc handles to the two service implementations.

ScoredPostsService impl

#[tonic::async_trait]
impl pb::scored_posts_service_server::ScoredPostsService for ScoredPostsServer {
    #[xai_stats_macro::receive_stats(latency=Bucket500To2500)]
    async fn get_scored_posts(
        &self,
        request: Request<pb::ScoredPostsQuery>,
    ) -> Result<Response<ScoredPostsResponse>, Status> {
        let b3_info = extract_b3_info(request.metadata());
        let ctx = self
            .query_builder
            .build(
                b3_info,
                request.into_inner(),
                Default::default(),
                "scored_posts",
            )
            .await?;
        let RequestContext {
            b3_info,
            query,
            root_span,
        } = ctx;
        let output = self.run_pipeline(query).instrument(root_span).await?;

        let mut response = Response::new(ScoredPostsResponse {
            scored_posts: output.scored_posts,
        });
        b3_info.inject_trace_response_header(&mut response);
        Ok(response)
    }

The "give me scored candidates" RPC. Steps:

  1. Extract B3 trace info from gRPC metadata.
  2. Build the RequestContext (validate, hydrate, etc.).
  3. Destructure the context.
  4. self.run_pipeline(query).instrument(root_span).await? — run the pipeline inside the root tracing span via .instrument(span). This means every nested span (sources, hydrators, filters, …) is parented to this request's root span — so the entire request shows up as one trace.
  5. Build the response.
  6. Inject the trace ID into the response header so the client can echo it in logs.

The receive_stats(latency=Bucket500To2500) is the same stats macro pattern we saw in Session 01 — automatic latency histogram with the right bucket range (500ms–2.5s is a reasonable budget for a full feed request).

    #[xai_stats_macro::receive_stats(latency=Bucket500To2500)]
    async fn get_debug_scored_posts(
        &self,
        request: Request<pb::DebugScoredPostsQuery>,
    ) -> Result<Response<DebugScoredPostsResponse>, Status> {
        let mut b3_info = extract_b3_info(request.metadata());
        b3_info.force_sample();

        let debug_query = request.into_inner();
        let fs_overrides = debug_query.feature_switch_overrides;
        let proto_query = debug_query.query.unwrap_or_default();

        let ctx = self
            .query_builder
            .build(b3_info, proto_query, fs_overrides, "debug_scored_posts")
            .await?;
        let RequestContext {
            b3_info,
            query,
            root_span,
        } = ctx;
        let output = self.run_pipeline(query).instrument(root_span).await?;

        let debug_json = build_debug_json(&output.pipeline_result);

        let mut response = Response::new(DebugScoredPostsResponse {
            scored_posts: output.scored_posts,
            debug_json,
        });
        b3_info.inject_trace_response_header(&mut response);
        Ok(response)
    }
}

Sibling RPC get_debug_scored_posts. Key differences from get_scored_posts:

  • b3_info.force_sample() — always trace debug requests (no sampling).
  • Takes feature_switch_overrides from the request — per-request feature flag overrides for debugging.
  • Returns both the scored posts AND a debug_json field with the full pipeline result serialized.

The build_debug_json function (defined in scored_posts_server.rs, which we'll read next) takes the PipelineResult and produces a JSON string for the response.

ForYouFeedService impl

#[tonic::async_trait]
impl pb::for_you_feed_service_server::ForYouFeedService for ForYouFeedServer {
    #[xai_stats_macro::receive_stats(latency=Bucket500To2500)]
    async fn get_for_you_feed(
        &self,
        request: Request<pb::ForYouFeedQuery>,
    ) -> Result<Response<ForYouFeedResponse>, Status> {
        let b3_info = extract_b3_info(request.metadata());
        let feed_query = request.into_inner();
        let proto_query = feed_query
            .query
            .ok_or_else(|| Status::invalid_argument("query must be specified"))?;
        let ctx = self
            .query_builder
            .build(b3_info, proto_query, Default::default(), "for_you_feed")
            .await?;
        let RequestContext {
            b3_info,
            query,
            root_span,
        } = ctx;
        let output = self.get_for_you_feed(query).instrument(root_span).await?;

        let mut response = Response::new(ForYouFeedResponse {
            items: output.items,
        });
        b3_info.inject_trace_response_header(&mut response);
        Ok(response)
    }

Same pattern as get_scored_posts but for the For You feed. Note the proto has a nested query field — must be unwrapped. Returns Vec<FeedItem> (which can include posts, prompts, who-to-follow modules, ads, etc.) — not just ScoredPosts.

    #[xai_stats_macro::receive_stats(latency=Bucket500To2500)]
    async fn get_for_you_feed_urt(
        &self,
        request: Request<pb::ForYouFeedQuery>,
    ) -> Result<Response<ForYouFeedUrtResponse>, Status> {
        let b3_info = extract_b3_info(request.metadata());
        let feed_query = request.into_inner();
        let proto_query = feed_query
            .query
            .ok_or_else(|| Status::invalid_argument("query must be specified"))?;
        let cursor_str = proto_query.cursor.clone();
        let request_context = proto_query.request_context.clone();
        let is_polling = proto_query.is_polling;

get_for_you_feed_urt is the URT (Unified Rendering Tree) variant. It pre-extracts three fields before moving proto_query into the builder, because we need them later.

cursor_str — opaque pagination cursor (a serialized Thrift struct). request_context — a string tag like "foreground_request" set by the client. is_polling — whether this is a background polling request (vs. user-initiated).

        let ctx = self
            .query_builder
            .build(b3_info, proto_query, Default::default(), "for_you_feed_urt")
            .await?;
        let RequestContext {
            b3_info,
            mut query,
            root_span,
        } = ctx;

        query.request_context = request_context;
        query.is_polling = is_polling;
        if !cursor_str.is_empty() {
            match cursor_utils::decode_ordered_cursor(&cursor_str) {
                Ok(Some(c)) => {
                    query.is_bottom_request = c.cursor_type == Some(CursorType::BOTTOM);
                    query.is_top_request = c.cursor_type == Some(CursorType::TOP);
                    query.cursor = Some(c);
                }
                Ok(None) => {}
                Err(e) => {
                    tracing::warn!(cursor_str, error = %e, "failed to decode URT cursor, ignoring");
                }
            }
        }

After building, decorate the query with URT-specific state. Note mut query in the destructure.

The cursor decoding: URT pagination uses an opaque cursor. decode_ordered_cursor parses it; if successful, set the cursor type flags on the query (bottom = "older posts please", top = "newer posts please") and store the cursor itself.

On decode error, log a warning and treat it as no-cursor. Don't fail the request — degraded UX (always-fresh feed) is better than an error.

        let urt = self
            .get_for_you_feed_urt(query)
            .instrument(root_span)
            .await?;

        let mut response = Response::new(ForYouFeedUrtResponse { urt: urt.into() });
        b3_info.inject_trace_response_header(&mut response);
        Ok(response)
    }
}

Run the pipeline; wrap the result. urt is Vec<u8> (serialized binary Thrift); urt.into() converts to Bytes.

XService impl — the bootstrap

#[tonic::async_trait]
impl xai_x_service_builder::XService for HomeMixerServer {
    type Config = HomeMixerConfig;

    async fn build(ctx: xai_x_service_builder::ServiceContext<HomeMixerConfig>) -> Self {
        let xai_x_service_builder::ServiceContext {
            feature_switches,
            decider,
            datacenter,
            config,
        } = ctx;

The XService trait is the adapter that XServiceBuilder (from main.rs) calls. It supplies a ServiceContext with the framework-managed dependencies — feature switches, decider, datacenter, and our custom config — and expects us to produce the server.

        let gizmoduck_client: Arc<dyn GizmoduckClient + Send + Sync> = Arc::new(
            ProdGizmoduckClient::new(
                config.shard_coordinate,
                &datacenter,
                Some("home-mixer.prod".to_string()),
            )
            .await
            .expect("Failed to create Gizmoduck client"),
        );

Create the production Gizmoduck client. .expect("…") — panic on failure. Without Gizmoduck, the service can't start.

        let query_builder = QueryBuilder {
            feature_switches,
            decider,
            datacenter: datacenter.clone(),
            gizmoduck_client,
        };

        let phoenix_candidate_pipeline =
            Arc::new(PhoenixCandidatePipeline::prod(config.shard_coordinate, &datacenter).await);

        let scored_posts = Arc::new(ScoredPostsServer::new(
            query_builder.clone(),
            phoenix_candidate_pipeline,
        ));

        let for_you_pipeline =
            ForYouCandidatePipeline::new(Arc::clone(&scored_posts), &datacenter).await;

        let for_you = Arc::new(ForYouFeedServer::new(query_builder, for_you_pipeline));

        HomeMixerServer {
            scored_posts,
            for_you,
        }
    }

The construction graph. Read top-to-bottom:

  1. Build QueryBuilder.
  2. Build PhoenixCandidatePipeline — the inner ML scoring pipeline.
  3. Build ScoredPostsServer, wrapping the QueryBuilder and the Phoenix pipeline.
  4. Build ForYouCandidatePipeline, which wraps the ScoredPostsServer as one of its candidate sources. (Note this is why ScoredPostsServer is Arc'd before ForYouFeedServer is built — the For You server holds an Arc to the scored-posts server inside its pipeline.)
  5. Build ForYouFeedServer.
  6. Return the HomeMixerServer struct.

So the architecture is nested: For You sits on top of Scored Posts. The Scored Posts service does pure ranking (post in, score out). The For You service does ranking + post-processing (URT decoration, blending with prompts, ads, who-to-follow modules).

    fn register(self: Arc<Self>, routes: &mut tonic::service::RoutesBuilder) {
        routes.add_service(
            pb::scored_posts_service_server::ScoredPostsServiceServer::from_arc(Arc::clone(
                &self.scored_posts,
            ))
            .max_decoding_message_size(params::MAX_GRPC_MESSAGE_SIZE)
            .max_encoding_message_size(params::MAX_GRPC_MESSAGE_SIZE)
            .accept_compressed(CompressionEncoding::Gzip)
            .accept_compressed(CompressionEncoding::Zstd)
            .send_compressed(CompressionEncoding::Gzip)
            .send_compressed(CompressionEncoding::Zstd),
        );
        routes.add_service(
            pb::for_you_feed_service_server::ForYouFeedServiceServer::from_arc(Arc::clone(
                &self.for_you,
            ))
            .max_decoding_message_size(params::MAX_GRPC_MESSAGE_SIZE)
            .max_encoding_message_size(params::MAX_GRPC_MESSAGE_SIZE)
            .accept_compressed(CompressionEncoding::Gzip)
            .accept_compressed(CompressionEncoding::Zstd)
            .send_compressed(CompressionEncoding::Gzip)
            .send_compressed(CompressionEncoding::Zstd),
        );
    }
}

Register both services with the gRPC router. Each gets:

  • A larger-than-default message size cap.
  • Accept and send compression via both Gzip and Zstd. The client picks whichever it prefers; the server supports both.

End of server.rs.


for_you_server.rs (87 lines)

The thin For You wrapper. Mostly delegates to ForYouCandidatePipeline, with the URT serialization layer added.

use crate::candidate_pipeline::for_you_candidate_pipeline::ForYouCandidatePipeline;
use crate::models::query::ScoredPostsQuery;
use crate::params;
use crate::server::QueryBuilder;
use crate::util::urt;
use tonic::Status;
use tracing::info;
use xai_candidate_pipeline::candidate_pipeline::CandidatePipeline;
use xai_home_mixer_proto::FeedItem;

pub(crate) struct ForYouFeedOutput {
    pub items: Vec<FeedItem>,
}

pub struct ForYouFeedServer {
    pub(crate) query_builder: QueryBuilder,
    pipeline: ForYouCandidatePipeline,
}

util::urt is another module reference that's not in the open release. The FeedItem is the URT proto wire-type.

ForYouFeedServer holds the QueryBuilder (so it can build queries from inside debug endpoints if needed) and the ForYouCandidatePipeline. Both are by-value (not Arc) — there's only one of each.

impl ForYouFeedServer {
    pub fn new(query_builder: QueryBuilder, pipeline: ForYouCandidatePipeline) -> Self {
        Self {
            query_builder,
            pipeline,
        }
    }

    pub(crate) async fn get_for_you_feed(
        &self,
        query: ScoredPostsQuery,
    ) -> Result<ForYouFeedOutput, Status> {
        if params::TEST_USER_IDS.contains(&query.user_id) {
            return Ok(ForYouFeedOutput { items: vec![] });
        }

        let result = self.pipeline.execute(query).await;

        Ok(ForYouFeedOutput {
            items: result.selected_candidates,
        })
    }

The body of For You. Short-circuit for test users: if the request is from a known test user (e.g. internal monitoring synthetic users), return empty. We don't want to waste compute on synthetic users and we don't want their bogus activity polluting metrics or training data.

Otherwise: run the pipeline and return the selected candidates.

Note result.selected_candidates here is Vec<FeedItem> (not Vec<PostCandidate> as you might expect from Session 01). This is because the For You pipeline uses FeedItem as its candidate type (because it mixes posts with prompts, who-to-follow, ads, etc.). The Scored Posts pipeline uses PostCandidate. Different Cs for different pipelines.

    pub(crate) async fn get_for_you_feed_urt(
        &self,
        query: ScoredPostsQuery,
    ) -> Result<Vec<u8>, Status> {
        log_request_info(&query);

        let cursor = query.cursor.clone();
        let request_context = query.request_context.clone();
        let client_app_id = query.client_app_id;
        let viewer_id = query.user_id;
        let language_code = query.language_code.clone();
        let country_code = query.country_code.clone();

        let output = self.get_for_you_feed(query).await?;

URT variant. Extract a bunch of fields before moving query into get_for_you_feed — these are needed for URT serialization but the query gets consumed.

        let timeline_response = urt::make_urt_timeline(
            &output.items,
            cursor.as_ref(),
            &request_context,
            client_app_id,
            viewer_id,
            &language_code,
            if country_code.is_empty() {
                None
            } else {
                Some(&country_code)
            },
        );

        xai_urt_thrift::serialize_binary(&timeline_response)
            .map_err(|e| Status::internal(format!("failed to serialize URT: {e}")))
    }
}

Build the URT timeline from the items, serialize to binary Thrift, return as Vec<u8>. URT serialization is the thing that produces the rich client-side rendering — cards, separators, modules. The conversion from Vec<FeedItem> to URT is in util::urt::make_urt_timeline (not in the open release).

The empty country_code handling: an empty string converts to None. Otherwise pass as Some(&str). Probably so URT can decide on country-specific renderings only when known.

fn log_request_info(query: &ScoredPostsQuery) {
    info!(
        request_id = query.request_id,
        user_id = query.user_id,
        app_id = query.client_app_id,
        request_context = %query.request_context,
        cursor = ?query.cursor,
        seen_ids = query.seen_ids.len(),
        "For You Feed URT request -"
    );
}

Structured log helper. ? for Debug formatting, % for Display. The fields end up as separate JSON keys in the log output (using tracing-subscriber's JSON formatter), great for filtering in log search.

End of file.


scored_posts_server.rs (215 lines)

The Scored Posts server implementation, plus the build_debug_json helper.

use crate::candidate_pipeline::phoenix_candidate_pipeline::PhoenixCandidatePipeline;
use crate::models::brand_safety::BrandSafetyVerdict;
use crate::models::candidate::CandidateHelpers;
use crate::models::candidate::PostCandidate;
use crate::models::query::ScoredPostsQuery;
use crate::params;
use crate::server::{PipelineOutput, QueryBuilder};
use bytes::Bytes;
use std::sync::Arc;
use std::time::Instant;
use tonic::Status;
use tracing::info;
use xai_candidate_pipeline::candidate_pipeline::{CandidatePipeline, PipelineResult};
use xai_home_mixer_proto::ScoredPost;
use xai_stats_receiver::global_stats_receiver;
use xai_x_thrift::tweet_safety_label::SafetyLabelType;

const PRODUCT_SURFACE_METRIC: &str = "ScoredPostsServer.product_surface";
const TOPIC_METRIC: &str = "ScoredPosts.topic";
const SURFACE_RANKED_FOLLOWING: &str = "ranked_following";
const SURFACE_TOPICS: &str = "topics";
const SURFACE_FOR_YOU: &str = "for_you";
const SURFACE_FOR_YOU_WITH_SNOOZED_TOPICS: &str = "for_you_with_snoozed_topics";

Four "surface" constants. The ScoredPosts service has multiple product surfaces:

  • ranked_following — when in_network_only = true, this is the "Following" tab.
  • topics — when topic_ids is non-empty, this is the per-topic feed.
  • for_you_with_snoozed_topics — For You feed with some topics temporarily muted.
  • for_you — default, everything-goes.

Tracking which surface a request is for matters for metrics: For You and Following have different latency / cache / experiment profiles.

pub struct ScoredPostsServer {
    pub(crate) phoenix_candidate_pipeline: Arc<PhoenixCandidatePipeline>,
    pub(crate) query_builder: QueryBuilder,
}

impl ScoredPostsServer {
    pub fn new(
        query_builder: QueryBuilder,
        phoenix_candidate_pipeline: Arc<PhoenixCandidatePipeline>,
    ) -> Self {
        Self {
            phoenix_candidate_pipeline,
            query_builder,
        }
    }

Two fields: the QueryBuilder (shared with the For You server) and the Phoenix pipeline (Arc because For You pipeline also references it).

    pub(crate) async fn run_pipeline(
        &self,
        query: ScoredPostsQuery,
    ) -> Result<PipelineOutput, Status> {
        if params::TEST_USER_IDS.contains(&query.user_id) {
            return Ok(PipelineOutput {
                scored_posts: vec![],
                pipeline_result: PipelineResult::empty(),
            });
        }

        log_request_info(&query);

        let start = Instant::now();

        let pipeline_result = self.phoenix_candidate_pipeline.execute(query).await;

        info!(
            "Scored Posts response - request_id {} - {} posts ({} ms)",
            pipeline_result.query.request_id,
            pipeline_result.selected_candidates.len(),
            start.elapsed().as_millis()
        );

        log_response_stats(&pipeline_result);

        let scored_posts = candidates_to_scored_posts(&pipeline_result.selected_candidates);

        Ok(PipelineOutput {
            scored_posts,
            pipeline_result,
        })
    }
}

The pipeline runner.

Same test-user short-circuit as For You. Then log the request, run the pipeline, log the response with latency, emit stats, convert candidates to wire format.

PipelineResult::empty() was the convenience constructor we noted in Session 01. It produces an empty result with default Q — but ScoredPostsQuery implements Default (it's derived), so this works.

candidates_to_scored_posts

fn candidates_to_scored_posts(candidates: &[PostCandidate]) -> Vec<ScoredPost> {
    candidates
        .iter()
        .map(|candidate| {
            let screen_names = candidate.get_screen_names();
            ScoredPost {
                tweet_id: candidate.tweet_id,
                author_id: candidate.author_id,
                retweeted_tweet_id: candidate.retweeted_tweet_id.unwrap_or(0),
                retweeted_user_id: candidate.retweeted_user_id.unwrap_or(0),
                in_reply_to_tweet_id: candidate.in_reply_to_tweet_id.unwrap_or(0),
                score: candidate.score.unwrap_or(0.0) as f32,
                in_network: candidate.in_network.unwrap_or(false),
                served_type: candidate.served_type.map(|t| t as i32).unwrap_or_default(),
                last_scored_timestamp_ms: candidate.last_scored_at_ms.unwrap_or(0),
                prediction_request_id: candidate.prediction_request_id.unwrap_or(0),
                ancestors: candidate.ancestors.clone(),
                screen_names,
                visibility_reason: candidate.visibility_reason.clone().map(|r| r.into()),
                tweet_type_metrics: Bytes::from(
                    candidate.tweet_type_metrics.clone().unwrap_or_default(),
                ),
                following_replied_user_ids: candidate.following_replied_user_ids.clone(),
                brand_safety_verdict: candidate
                    .brand_safety_verdict
                    .unwrap_or(BrandSafetyVerdict::MediumRisk)
                    as i32,
                safety_label_types: candidate
                    .safety_labels
                    .iter()
                    .filter_map(|l| safety_label_to_proto(l.label_type))
                    .collect(),
                tweet_text: candidate.tweet_text.clone(),
            }
        })
        .collect()
}

The internal-to-wire conversion. Every Option<T> gets unwrapped (with sensible defaults for the proto wire format — typically 0 for IDs, MediumRisk for brand safety which is the safer default).

Bytes::from(candidate.tweet_type_metrics.clone().unwrap_or_default())tweet_type_metrics is Option<Vec<u8>> — a pre-encoded blob (probably a Thrift struct). Convert to Bytes for zero-copy wire transmission.

Note brand safety defaults to MediumRisk if unknown. The conservative default: if we don't know, assume the post is medium risk and let downstream ad-pacing handle it.

build_debug_json

pub(crate) fn build_debug_json(
    pipeline_result: &PipelineResult<ScoredPostsQuery, PostCandidate>,
) -> String {
    let debug = serde_json::json!({
        "query": pipeline_result.query.as_ref(),
        "retrieved_candidates": &pipeline_result.retrieved_candidates,
        "filtered_candidates": &pipeline_result.filtered_candidates,
        "selected_candidates": &pipeline_result.selected_candidates,
        "stats": {
            "retrieved_count": pipeline_result.retrieved_candidates.len(),
            "filtered_count": pipeline_result.filtered_candidates.len(),
            "selected_count": pipeline_result.selected_candidates.len(),
        },
    });

    serde_json::to_string(&debug)
        .unwrap_or_else(|e| format!("{{\"error\": \"Failed to serialize debug info: {}\"}}", e))
}

The debug serializer. Builds a JSON value with the full pipeline result and a summary stats object.

pipeline_result.query.as_ref()query is Arc<Q>; .as_ref() gets &Q so serde_json::json! can serialize it via the derived Serialize impl.

The fallback unwrap_or_else(|e| ...) produces a hand-rolled JSON error object on serialization failure. Should never happen in practice, but defensive.

log_request_info — surface dispatch

fn log_request_info(query: &ScoredPostsQuery) {
    let seen_ids_count = query.seen_ids.len();
    let surface = if query.in_network_only {
        SURFACE_RANKED_FOLLOWING
    } else if !query.topic_ids.is_empty() {
        SURFACE_TOPICS
    } else if query.has_excluded_topics() {
        SURFACE_FOR_YOU_WITH_SNOOZED_TOPICS
    } else {
        SURFACE_FOR_YOU
    };

    if surface == SURFACE_TOPICS {
        info!(
            "Scored Posts request - {} - request_id {} - seen_ids {} - topic_ids {:?}",
            surface, query.request_id, seen_ids_count, query.topic_ids
        );
    } else if query.has_excluded_topics() {
        info!(
            "Scored Posts request - {} - request_id {} - seen_ids {} - excluded_topic_ids {:?}",
            surface, query.request_id, seen_ids_count, query.excluded_topic_ids
        );
    } else {
        info!(
            "Scored Posts request - {} - request_id {} - seen_ids {}",
            surface, query.request_id, seen_ids_count
        );
    }

    if let Some(receiver) = global_stats_receiver() {
        receiver.incr(PRODUCT_SURFACE_METRIC, &[("surface", surface)], 1);
    }
}

Compute the surface name (the cascading if/else if), log accordingly, and emit a counter tagged with the surface.

fn log_response_stats(pipeline_result: &PipelineResult<ScoredPostsQuery, PostCandidate>) {
    if pipeline_result.query.topic_ids.len() == 1
        && let Some(receiver) = global_stats_receiver()
    {
        let tid_str = pipeline_result.query.topic_ids[0].to_string();
        receiver.incr(
            TOPIC_METRIC,
            &[("type", "request"), ("topic_id", &tid_str)],
            1,
        );
        if pipeline_result.selected_candidates.is_empty() {
            receiver.incr(
                TOPIC_METRIC,
                &[("type", "empty"), ("topic_id", &tid_str)],
                1,
            );
        }
    }
}

Per-topic stats. Only fires when there's exactly one topic in the request (bulk-topic requests are excluded from this dashboard, presumably because they'd noise it up). Counts every request and every empty response per topic. Useful for spotting topics that consistently return zero results.

if let Some(receiver) = ... && let cond = …let chains again.

safety_label_to_proto

fn safety_label_to_proto(label: SafetyLabelType) -> Option<i32> {
    use xai_home_mixer_proto::SafetyLabelType as HM;
    let v = match label {
        SafetyLabelType::NSFW_HIGH_PRECISION => HM::NsfwHighPrecision,
        SafetyLabelType::NSFW_HIGH_RECALL => HM::NsfwHighRecall,
        SafetyLabelType::NSFA_HIGH_PRECISION => HM::NsfaHighPrecision,
        SafetyLabelType::NSFA_KEYWORDS_HIGH_PRECISION => HM::NsfaKeywordsHighPrecision,
        SafetyLabelType::GORE_AND_VIOLENCE_HIGH_PRECISION => HM::GoreAndViolenceHighPrecision,
        SafetyLabelType::NSFW_REPORTED_HEURISTICS => HM::NsfwReportedHeuristics,
        SafetyLabelType::GORE_AND_VIOLENCE_REPORTED_HEURISTICS => {
            HM::GoreAndViolenceReportedHeuristics
        }
        SafetyLabelType::NSFW_CARD_IMAGE => HM::NsfwCardImage,
        SafetyLabelType::DO_NOT_AMPLIFY => HM::DoNotAmplify,
        SafetyLabelType::NSFA_COMMUNITY_NOTE => HM::NsfaCommunityNote,
        SafetyLabelType::PDNA => HM::Pdna,
        SafetyLabelType::EGREGIOUS_NSFW => HM::EgregiousNsfw,
        SafetyLabelType::GROK_NSFA => HM::GrokNsfa,
        SafetyLabelType::NSFW_TEXT => HM::NsfwText,
        SafetyLabelType::NSFA_LIMITED_INVENTORY => HM::NsfaLimitedInventory,
        SafetyLabelType::GROK_NSFA_LIMITED => HM::GrokNsfaLimited,
        SafetyLabelType::NSFA_HIGH_RECALL => HM::NsfaHighRecall,
        SafetyLabelType::GROK_SFA => HM::GrokSfa,
        _ => return None,
    };
    Some(v.into())
}

Map the internal safety label enum (from xai_x_thrift::tweet_safety_label) to the wire safety label enum (the xai_home_mixer_proto::SafetyLabelType). Why the duplication? Because:

  • The internal enum has hundreds of variants (every label X has ever produced).
  • The wire enum only includes labels relevant to downstream consumers of the home-mixer service.

Unmapped labels return None and get filtered out in the filter_map above. The wire response only includes labels the consumer cares about.

NSFW / NSFA terminology:

  • NSFW = Not Safe For Work (sexual content).
  • NSFA = Not Suitable For Advertisers (content that's allowed on the platform but ads shouldn't run against — gore, hate, etc.).
  • PDNA = Photo DNA — child-safety hash match.
  • PTOS = Post Terms Of Service. (Reviewed = a human review passed.)
  • Grok-{SFA, NSFA} = labels assigned by the Grok-based classifier (which we'll read in the grox/ sessions later).

End of scored_posts_server.rs.


models/mod.rs (7 lines)

pub mod brand_safety;
pub mod candidate;
pub mod candidate_features;

pub mod in_network_reply;
pub mod query;
pub mod user_features;

Public re-exports. Nothing else.


models/query.rs (248 lines) — ScoredPostsQuery

The query type — what flows from the gRPC handler through every pipeline stage. The most-touched struct in the whole codebase.

use crate::models::candidate::PostCandidate;
use crate::models::in_network_reply::{InNetworkReplies, serialize_in_network_replies};
use crate::models::user_features::UserFeatures;
use serde::Serialize;
use std::time::{SystemTime, UNIX_EPOCH};
use xai_candidate_pipeline::candidate_pipeline::PipelineQuery;
use xai_core_entities::entities::SubscriptionLevel;
use xai_decider::Decider;
use xai_feature_switches::Params;
use xai_recsys_proto::gender_prediction::InferredGenderLabel;
use xai_recsys_proto::{DeviceNetworkType, Timezone};
use xai_twittercontext_proto::{GetTwitterContextViewer, TwitterContextViewer};
use xai_urt_thrift::cursor::UrtOrderedCursor;
use xai_x_thrift::non_polling_timestamps::NonPollingTimestamps;
use xai_x_thrift::served_history::ServedHistory;

#[derive(Clone, Debug)]
pub struct ImpressionBloomFilterEntry {
    pub bloom_filter: Vec<u64>,
    pub size_cap: i32,
    pub false_positive_rate: f64,
}

Imports + a small helper struct. ImpressionBloomFilterEntry represents an impression history — bloom filters are used to dedupe against posts the user has already seen (efficient O(1) check with bounded false-positive rate). Multiple entries means the bloom filter is layered (recent vs. older), and they're hydrated by a query hydrator we'll read later.

#[derive(Clone, Default, Debug, Serialize)]
pub struct ScoredPostsQuery {
    pub user_id: u64,
    pub client_app_id: i32,
    pub country_code: String,
    pub language_code: String,
    pub seen_ids: Vec<u64>,
    pub served_ids: Vec<u64>,
    pub in_network_only: bool,
    pub is_bottom_request: bool,
    pub is_top_request: bool,
    #[serde(skip)]
    pub bloom_filter_entries: Vec<ImpressionBloomFilterEntry>,
    #[serde(skip)]
    pub scoring_sequence: Option<xai_recsys_proto::UserActionSequence>,
    #[serde(skip)]
    pub columnar_scoring_sequence: Option<bytes::Bytes>,
    #[serde(skip)]
    pub retrieval_sequence: Option<xai_recsys_proto::UserActionSequence>,
    #[serde(skip)]
    pub columnar_retrieval_sequence: Option<bytes::Bytes>,
    pub user_features: UserFeatures,
    pub user_roles: Vec<String>,
    pub params: Params,
    #[serde(skip)]
    pub decider: Option<Decider>,
    pub request_id: u64,
    pub prediction_id: u64,
    pub request_time_ms: i64,
    pub cached_posts: Vec<PostCandidate>,
    pub has_cached_posts: bool,
    pub topic_ids: Vec<i64>,
    pub excluded_topic_ids: Vec<i64>,
    pub new_user_topic_ids: Vec<i64>,
    pub exclude_videos: bool,
    #[serde(serialize_with = "serialize_in_network_replies")]
    pub in_network_replies: InNetworkReplies,
    pub viewer_minhash: Option<Vec<i64>>,
    pub ip_address: String,
    pub user_agent: String,
    #[serde(serialize_with = "serialize_debug")]
    pub time_zone: Timezone,
    #[serde(serialize_with = "serialize_debug")]
    pub device_network_type: DeviceNetworkType,
    pub client_version: String,
    pub device_id: String,
    pub mobile_device_id: String,
    pub mobile_device_ad_id: String,
    pub user_demographics: Option<UserDemographics>,
    pub ip_location: Option<xai_geo_ip::LocationInfo>,
    pub user_age_in_years: Option<i32>,
    #[serde(serialize_with = "serialize_debug")]
    pub user_inferred_gender: Option<InferredGenderLabel>,
    pub user_inferred_gender_score: Option<f32>,
    pub followed_grok_topics: Option<[bool; 32]>,
    pub inferred_grok_topics: Option<[bool; 32]>,
    pub followed_starter_packs: Option<[bool; 20]>,
    pub subscription_level: Option<SubscriptionLevel>,
    pub is_shadow_traffic: bool,
    pub is_preview: bool,
    pub is_polling: bool,
    #[serde(serialize_with = "serialize_debug")]
    pub cursor: Option<UrtOrderedCursor>,
    pub request_context: String,
    #[serde(skip)]
    pub served_history: Vec<ServedHistory>,
    pub who_to_follow_eligible: bool,
    #[serde(serialize_with = "serialize_debug")]
    pub non_polling_timestamps: Option<NonPollingTimestamps>,
    pub impressed_post_ids: Vec<u64>,
    pub push_to_home_post_id: Option<u64>,
}

The mega-struct. Roughly 50 fields. Let's group them:

Identity & request metadata:

  • user_id, client_app_id, country_code, language_code — who's asking and from where/what app.
  • request_id, prediction_id — for joining served-feed logs against engagement logs.
  • request_time_ms — when the request started (set in ScoredPostsQuery::new).
  • request_context — opaque client-provided tag.
  • client_version, device_id, mobile_device_id, mobile_device_ad_id — device identifiers.
  • ip_address, user_agent, time_zone, device_network_type — network/device state.

Mode flags:

  • in_network_only — only show posts from accounts the user follows.
  • is_bottom_request / is_top_request — pagination direction (older posts / newer posts).
  • is_polling — background polling vs. user-initiated request.
  • is_shadow_traffic — sampled for shadow logging (separate from B3 tracing).
  • is_preview — pre-fetched (not yet shown to user).

Personalization signals:

  • seen_ids, served_ids, impressed_post_ids — three flavors of "already shown to this user" (different granularities / sources).
  • bloom_filter_entries — bloom-filter version of seen IDs (more memory-efficient for long histories).
  • served_history — typed version with timestamps.
  • scoring_sequence / retrieval_sequence — the user-action sequence inputs for Phoenix (engagement history as features). Note both dense (UserActionSequence) and columnar (Bytes) versions — the columnar version is pre-encoded for fast Phoenix consumption, the dense version is for human-readable debug.
  • user_features — muted keywords + blocked / muted / followed / subscribed user IDs (hydrated by user_features_query_hydrator).
  • user_demographics, user_age_in_years, user_inferred_gender(_score) — demographic features for personalization.
  • viewer_minhash — MinHash signature of user's interests (used for OON retrieval similarity).
  • followed_grok_topics, inferred_grok_topics, followed_starter_packs — fixed-size bool arrays ([bool; 32] and [bool; 20]) encoding feature flags for topics / starter packs.

Request-specific content filters:

  • topic_ids, excluded_topic_ids, new_user_topic_ids — topic restrictions.
  • exclude_videos — skip videos (some clients want text-only).
  • in_network_repliesOnceLock-backed deferred field (the replies are computed lazily on first access).

Subscription / monetization:

  • subscription_level — free / Premium / Premium+ tier.
  • who_to_follow_eligible — whether to include the "Who to follow" module.

Cursor / URT:

  • cursor — pagination cursor.
  • non_polling_timestamps — when the user last did a real (non-polling) request, used by URT.
  • push_to_home_post_id — if the user clicked a notification, "pin" that post to the top.

Cached posts:

  • cached_posts, has_cached_posts — fast path: re-use scoring from a recent prior request.

Pipeline plumbing:

  • params — feature-switch values (snapshot).
  • decider (Option<Decider> because Default impls require it) — A/B-testing client scoped to this user.
  • user_roles — strings like "admin", "verified" — used in feature-switch evaluation.

This struct is the complete personalization state for one request. Every hydrator and filter reads from it; many hydrators write into it (the update pattern from Session 01).

A few #[serde(skip)] fields: bloom filters, action sequences, served history, decider. These are too big to serialize into the debug JSON (or are non-serializable, like Decider with its internal Arc<…>). The debug JSON shows the user-visible state.

A few #[serde(serialize_with = "serialize_debug")] fields: types that don't have Serialize impls. The custom serializer formats them via Debug instead (see below).

pub use xai_candidate_pipeline::component_library::clients::strato_client::UserDemographics;

Re-export of UserDemographics from the component library. Used as a field type.

Constructor

impl ScoredPostsQuery {
    pub fn new(
        user_id: u64,
        client_app_id: i32,
        country_code: String,
        language_code: String,
        seen_ids: Vec<u64>,
        served_ids: Vec<u64>,
        in_network_only: bool,
        is_bottom_request: bool,
        params: Params,
        decider: Decider,
        user_roles: Vec<String>,
        muted_keywords: Vec<String>,
        follower_count: Option<i64>,
        topic_ids: Vec<i64>,
        excluded_topic_ids: Vec<i64>,
        exclude_videos: bool,
        request_id: u64,
        prediction_id: u64,
        ip_address: String,
        user_agent: String,
        time_zone: Timezone,
        device_network_type: DeviceNetworkType,
        client_version: String,
        device_id: String,
        mobile_device_id: String,
        mobile_device_ad_id: String,
        subscription_level: Option<SubscriptionLevel>,
        is_shadow_traffic: bool,
        is_preview: bool,
        age_in_years: Option<i32>,
        push_to_home_post_id: Option<u64>,
    ) -> Self {

31 parameters. This is the constructor we already saw being called from QueryBuilder::build.

        Self {
            user_id,
            client_app_id,
            country_code,
            language_code,
            seen_ids,
            served_ids,
            in_network_only,
            is_bottom_request,
            is_top_request: false,
            bloom_filter_entries: vec![],
            scoring_sequence: None,
            columnar_scoring_sequence: None,
            retrieval_sequence: None,
            columnar_retrieval_sequence: None,
            user_features: UserFeatures {
                muted_keywords,
                follower_count,
                ..Default::default()
            },
            user_roles,
            params,
            decider: Some(decider),
            request_id,
            prediction_id,
            request_time_ms: current_time_ms(),
            cached_posts: vec![],
            has_cached_posts: false,
            topic_ids,
            excluded_topic_ids,
            new_user_topic_ids: vec![],
            exclude_videos,
            in_network_replies: Default::default(),
            viewer_minhash: None,
            ip_address,
            user_agent,
            time_zone,
            device_network_type,
            client_version,
            device_id,
            mobile_device_id,
            mobile_device_ad_id,
            user_demographics: None,
            ip_location: None,
            user_age_in_years: age_in_years,
            user_inferred_gender: None,
            user_inferred_gender_score: None,
            followed_grok_topics: None,
            inferred_grok_topics: None,
            followed_starter_packs: None,
            subscription_level,
            is_shadow_traffic,
            is_preview,
            is_polling: false,
            cursor: None,
            request_context: String::new(),
            served_history: vec![],
            who_to_follow_eligible: false,
            non_polling_timestamps: None,
            impressed_post_ids: Vec::new(),
            push_to_home_post_id,
        }
    }

The struct literal. All "hydrated-later" fields start as None or empty. Notice:

  • user_features: UserFeatures { muted_keywords, follower_count, ..Default::default() } — only two fields are populated at construction; the rest (blocked / muted / followed / subscribed user IDs) get filled by the user_features_query_hydrator.
  • request_time_ms: current_time_ms() — set to now. Pipeline stages can compute "how long has this request been alive" by comparing against this.
    pub fn is_topic_request(&self) -> bool {
        !self.topic_ids.is_empty()
    }

    pub fn is_bulk_topic_request(&self) -> bool {
        self.topic_ids.len() > 6
    }

    pub fn has_excluded_topics(&self) -> bool {
        !self.excluded_topic_ids.is_empty()
    }

    pub fn has_new_user_topic_ids(&self) -> bool {
        !self.new_user_topic_ids.is_empty()
    }
}

Convenience predicates over the topic fields. is_bulk_topic_request uses the threshold 6: a request with 7+ topics is "bulk" (presumably triggers a different code path because it's likely a backend/admin call rather than a user-facing topic feed).

impl GetTwitterContextViewer for ScoredPostsQuery {
    fn get_viewer(&self) -> Option<TwitterContextViewer> {
        Some(TwitterContextViewer {
            user_id: self.user_id as i64,
            client_application_id: self.client_app_id as i64,
            request_country_code: self.country_code.clone(),
            request_language_code: self.language_code.clone(),
            ..Default::default()
        })
    }
}

GetTwitterContextViewer is a trait from xai_twittercontext_proto. Implementing it makes ScoredPostsQuery usable wherever "twitter context viewer" is required (probably a Thrift/proto bridge in client libraries). It's an adapter.

impl PipelineQuery for ScoredPostsQuery {
    fn params(&self) -> &Params {
        &self.params
    }

    fn decider(&self) -> Option<&Decider> {
        self.decider.as_ref()
    }
}

The PipelineQuery impl — the trait we read in Session 01. Two methods: expose feature-switch params and (optional) decider. This is what makes ScoredPostsQuery usable as the Q parameter of CandidatePipeline<Q, C>.

pub fn current_time_ms() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as i64
}

fn serialize_debug<S, T: std::fmt::Debug>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
    S: serde::Serializer,
{
    serializer.serialize_str(&format!("{:?}", value))
}

Two helpers:

  • current_time_ms() — unix ms. .unwrap_or_default() is defensive (returns 0 if system clock is before epoch — never happens but safe).
  • serialize_debug — the custom serializer used for fields like Timezone, DeviceNetworkType, etc. that don't impl Serialize. Falls back to Debug.

End of query.rs.


models/candidate.rs (153 lines) — PostCandidate

The other half of the type system: what flows through hydrators / filters / scorers.

use crate::models::brand_safety::BrandSafetyVerdict;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub use xai_candidate_pipeline::component_library::models::PhoenixScores;
use xai_home_mixer_proto as pb;
use xai_visibility_filtering::models as vf;

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PostCandidate {
    pub tweet_id: u64,
    pub author_id: u64,
    pub tweet_text: String,
    pub in_reply_to_tweet_id: Option<u64>,
    pub retweeted_tweet_id: Option<u64>,
    pub retweeted_user_id: Option<u64>,
    pub quoted_tweet_id: Option<u64>,
    pub quoted_user_id: Option<u64>,
    pub phoenix_scores: PhoenixScores,
    pub prediction_request_id: Option<u64>,
    pub last_scored_at_ms: Option<u64>,
    pub weighted_score: Option<f64>,
    pub score: Option<f64>,

Header fields. Two key things:

  1. PhoenixScores is re-exported from the component library. It's the per-action probability distribution (P(like), P(reply), P(repost), …) we read about in Session 01's overview.
  2. Three score fieldsphoenix_scores (raw model output), weighted_score (after combining via weights), score (final after attenuation by author diversity / OON adjustment / etc.).

The various *_tweet_id and *_user_id fields represent the post's structural relationships:

  • Reply: in_reply_to_tweet_id, in_reply_to_user_id (the latter is in tweet_type_metrics further down)
  • Retweet: retweeted_tweet_id, retweeted_user_id
  • Quote: quoted_tweet_id, quoted_user_id

All three can be combined: a quote-reply-retweet exists.

    #[serde(
        serialize_with = "serialize_served_type",
        deserialize_with = "deserialize_served_type"
    )]
    pub served_type: Option<pb::ServedType>,
    pub in_network: Option<bool>,
    pub ancestors: Vec<u64>,
    pub min_video_duration_ms: Option<i32>,
    pub quoted_video_duration_ms: Option<i32>,
    pub author_followers_count: Option<i32>,
    pub author_screen_name: Option<String>,
    pub retweeted_screen_name: Option<String>,
    pub visibility_reason: Option<vf::FilteredReason>,
    pub drop_ancillary_posts: Option<bool>,
    pub subscription_author_id: Option<u64>,
    pub tweet_type_metrics: Option<Vec<u8>>,
    pub author_blocks_viewer: Option<bool>,
    pub quoted_author_blocks_viewer: Option<bool>,
    pub filtered_topic_ids: Option<Vec<i64>>,
    pub unfiltered_topic_ids: Option<Vec<i64>>,
    #[serde(default)]
    pub following_replied_user_ids: Vec<u64>,
    pub has_media: Option<bool>,
    pub language_code: Option<String>,
    pub fav_count: Option<i64>,
    pub reply_count: Option<i64>,
    pub repost_count: Option<i64>,
    pub quote_count: Option<i64>,
    pub mutual_follow_jaccard: Option<f64>,
    pub brand_safety_verdict: Option<BrandSafetyVerdict>,
    #[serde(default)]
    pub safety_labels: Vec<SafetyLabelInfo>,
}

More fields. Categorically:

Identity / source:

  • served_typeTweet, RetweetedTweet, QuotedTweet, etc.
  • in_network — was this post sourced from in-network (Thunder) or out-of-network (Phoenix retrieval)?

Conversation graph:

  • ancestors — list of post IDs forming the conversation chain leading to this post.

Video / media:

  • min_video_duration_ms, quoted_video_duration_ms — video durations.
  • has_media — bool.

Author metadata (hydrated by gizmoduck_hydrator):

  • author_followers_count, author_screen_name, retweeted_screen_name.

Visibility / safety (hydrated by various):

  • visibility_reason — non-None means "this post should be filtered by VF stage."
  • brand_safety_verdict, safety_labels — for ads pacing.
  • author_blocks_viewer, quoted_author_blocks_viewer — block-graph state.
  • drop_ancillary_posts — drop the followed-replied / "people you follow replied" rendering.

Topics:

  • filtered_topic_ids, unfiltered_topic_ids — pre- and post-filter topic associations.

Engagement counts:

  • fav_count, reply_count, repost_count, quote_count — for feature lookup.

Personalization signals:

  • mutual_follow_jaccard — Jaccard similarity between the viewer's follow set and the author's follow set. High = friend-of-friend; useful as a relevance feature.
  • subscription_author_id — if this post is paid-subscription-gated, the ID of the gating account.
  • following_replied_user_ids — list of users the viewer follows who have replied to this post.

Pre-encoded blob:

  • tweet_type_metrics: Option<Vec<u8>> — opaque Thrift / proto blob containing tweet-type metrics for downstream consumers.

The #[serde(default)] on following_replied_user_ids and safety_labels is for deserialization — if the JSON omits these fields (e.g. older cached versions), default to empty Vec.

SafetyLabelInfo

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SafetyLabelInfo {
    #[serde(with = "xai_safety_label_store::types::serde_label_type")]
    pub label_type: xai_x_thrift::tweet_safety_label::SafetyLabelType,
    pub description: Option<String>,
    pub source: Option<String>,
}

Per-label info. The #[serde(with = "...")] references a serde module from xai_safety_label_store — necessary because SafetyLabelType (a Thrift enum) doesn't have native serde impls.

served_type serde

fn serialize_served_type<S>(
    served_type: &Option<pb::ServedType>,
    serializer: S,
) -> Result<S::Ok, S::Error>
where
    S: serde::Serializer,
{
    served_type.map(|value| value as i32).serialize(serializer)
}

fn deserialize_served_type<'de, D>(deserializer: D) -> Result<Option<pb::ServedType>, D::Error>
where
    D: serde::Deserializer<'de>,
{
    let value = Option::<i32>::deserialize(deserializer)?;
    match value {
        None => Ok(None),
        Some(raw_value) => pb::ServedType::try_from(raw_value)
            .map(Some)
            .map_err(|_| serde::de::Error::custom("invalid ServedType value")),
    }
}

Custom serde for the proto enum. prost-generated enums implement From<i32> / TryFrom<i32> but not Serialize/Deserialize directly. Round-trip via i32.

pb::ServedType::try_from(i32) returns Result<ServedType, …>. We map errors to a serde error.

CandidateHelpers trait

pub trait CandidateHelpers {
    fn get_screen_names(&self) -> HashMap<u64, String>;
    fn get_original_tweet_id(&self) -> u64;
    fn get_original_author_id(&self) -> u64;
    fn as_tweet_info(&self, is_followed_by_viewer: bool) -> xai_recsys_proto::TweetInfo;
}

impl CandidateHelpers for PostCandidate {
    fn get_screen_names(&self) -> HashMap<u64, String> {
        let mut screen_names = HashMap::<u64, String>::new();
        if let Some(author_screen_name) = self.author_screen_name.clone() {
            screen_names.insert(self.author_id, author_screen_name);
        }
        if let (Some(retweeted_screen_name), Some(retweeted_user_id)) =
            (self.retweeted_screen_name.clone(), self.retweeted_user_id)
        {
            screen_names.insert(retweeted_user_id, retweeted_screen_name);
        }
        screen_names
    }

A handful of derived getters bundled into a trait. The trait pattern (instead of inherent methods) is unusual. Likely because the helpers need to be implemented on multiple candidate-like types (e.g. a FeedItem containing a PostCandidate and an AdCandidate), so a trait gives a uniform interface.

get_screen_names builds a {user_id: screen_name} map for the author and (if a retweet) the retweeted user. The wire response can use this for rendering.

    fn get_original_tweet_id(&self) -> u64 {
        self.retweeted_tweet_id.unwrap_or(self.tweet_id)
    }

    fn get_original_author_id(&self) -> u64 {
        self.retweeted_user_id.unwrap_or(self.author_id)
    }

For retweets: the "original" is the retweeted post. For non-retweets: it's the post itself. This is the canonical "post content I'm interested in" — for ranking, ads-pacing, etc.

    fn as_tweet_info(&self, is_followed_by_viewer: bool) -> xai_recsys_proto::TweetInfo {
        xai_recsys_proto::TweetInfo {
            tweet_id: self.get_original_tweet_id(),
            author_id: self.get_original_author_id(),
            retweeting_tweet_id: if self.retweeted_tweet_id.is_some() {
                self.tweet_id
            } else {
                0
            },
            retweeting_author_id: if self.retweeted_user_id.is_some() {
                self.author_id
            } else {
                0
            },
            quoted_tweet_id: self.quoted_tweet_id.unwrap_or(0),
            quoted_author_id: self.quoted_user_id.unwrap_or(0),
            in_reply_to_tweet_id: self.in_reply_to_tweet_id.unwrap_or(0),
            is_author_followed_by_user: is_followed_by_viewer,
            min_video_duration_ms: self.min_video_duration_ms.map(|ms| ms as u64).unwrap_or(0),
            fav_count: self.fav_count.unwrap_or(0) as u64,
            retweet_count: self.repost_count.unwrap_or(0) as u64,
            quote_count: self.quote_count.unwrap_or(0) as u64,
            reply_count: self.reply_count.unwrap_or(0) as u64,
            language_code: xai_recsys_proto::language_code_string_to_enum(
                self.language_code.as_deref().unwrap_or(""),
            ) as i32,
            tweet_bool_features: Some(xai_recsys_proto::TweetBoolFeatures {
                has_media: self.has_media.unwrap_or(false),
                is_retweet: self.retweeted_tweet_id.is_some(),
                is_quote: self.quoted_tweet_id.is_some(),
                is_reply: self.in_reply_to_tweet_id.is_some(),
                ..Default::default()
            }),
            ..Default::default()
        }
    }
}

Convert PostCandidate to xai_recsys_proto::TweetInfo — the proto representation used by Phoenix as input. Note the is_followed_by_viewer parameter is passed in (not stored on the candidate) — it depends on the viewer, so it's per-request not per-post.

Several normalizations:

  • For retweets: tweet_id becomes the retweeted tweet's ID (the original), retweeting_tweet_id and retweeting_author_id get the retweet's own IDs.
  • language_code_string_to_enum(...) — proto enum lookup.
  • tweet_bool_features packs four bools into a sub-struct.

End of candidate.rs.


models/candidate_features.rs (146 lines)

Helper types and Thrift codecs for two cache-stored shapes: BlockedByUserIds (who blocks the viewer) and FilteredTopicsByExperiment (per-experiment topic lists).

use serde::{Deserialize, Serialize};
use thrift::protocol::{TInputProtocol, TOutputProtocol, TType};
use xai_strato::MValCodec;

pub use xai_core_entities::entities::{
    ExclusiveTweetControl, GizmoduckMuteSettings, GizmoduckMutedKeyword, GizmoduckUser,
    GizmoduckUserCounts, GizmoduckUserProfile, GizmoduckUserResult, MediaEntities, MediaEntity,
    MediaInfo, MuteSurface, PureCoreData, Reply, Share, VideoInfo,
};

Re-export a bunch of types from xai_core_entities. These are the core Gizmoduck types (user profile, mute settings, etc.). The re-export pattern means consumers can use crate::models::candidate_features::GizmoduckUser without knowing the underlying crate.

MValCodec is the Thrift codec trait used for Strato (X's data-fetching system) values. Implementing it lets a type be stored in / retrieved from Strato.

BlockedByUserIds

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct BlockedByUserIds {
    pub blocked_by_user_ids: Vec<i64>,
}

const BLOCKED_BY_USER_IDS_FIELD: i16 = -29174;

The list of users who have blocked the viewer. Note negative field ID -29174 — Strato/Thrift permits negative field IDs for client-specific extensions to a base schema. The negative range avoids clashes with the upstream schema authority.

impl MValCodec for BlockedByUserIds {
    fn thrift_type() -> TType {
        TType::Struct
    }

    fn from_thrift(proto: &mut dyn TInputProtocol) -> Self {
        proto.read_struct_begin().unwrap();
        let mut blocked_by_user_ids: Vec<i64> = Vec::new();
        loop {
            let field = proto.read_field_begin().unwrap();
            if field.field_type == TType::Stop {
                break;
            }
            match field.id {
                Some(BLOCKED_BY_USER_IDS_FIELD) => {
                    blocked_by_user_ids = Vec::<i64>::from_thrift(proto);
                }
                _ => {
                    proto.skip(field.field_type).unwrap();
                }
            }
            proto.read_field_end().unwrap();
        }
        proto.read_struct_end().unwrap();
        BlockedByUserIds {
            blocked_by_user_ids,
        }
    }

    fn to_thrift(&self, _proto: &mut dyn TOutputProtocol) {
        panic!("Not implemented: to_thrift for BlockedByUserIds")
    }
}

The standard Thrift codec pattern:

  • read_struct_begin / loop reading fields / read_struct_end.
  • On each field: check ID, branch.
  • Unknown fields: skip (forward compatibility — newer schemas can add fields without breaking older readers).

.unwrap() everywhere assumes well-formed Thrift; corrupt input panics. Defensible since this comes from trusted internal infrastructure (Strato).

to_thrift panics — this codec is read-only. Home-mixer reads BlockedByUserIds from Strato; it never writes back.

TopicFilteringExperiment

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TopicFilteringExperiment {
    Unfiltered,
    CuratedV0,
    CuratedV0V1,
    PostBased90Pct,
    PostBased75Pct,
    PostBased50Pct,
}

impl TopicFilteringExperiment {
    pub fn parse(s: &str) -> Self {
        match s {
            "Unfiltered" => Self::Unfiltered,
            "CuratedV0" => Self::CuratedV0,
            "CuratedV0V1" => Self::CuratedV0V1,
            "PostBased90Pct" => Self::PostBased90Pct,
            "PostBased75Pct" => Self::PostBased75Pct,
            "PostBased50Pct" => Self::PostBased50Pct,
            _ => Self::Unfiltered,
        }
    }

    pub fn as_proto_mode(&self) -> i32 {
        match self {
            Self::Unfiltered => 0,
            Self::CuratedV0 => 1,
            Self::CuratedV0V1 => 2,
            Self::PostBased90Pct => 3,
            Self::PostBased75Pct => 4,
            Self::PostBased50Pct => 5,
        }
    }
}

Topic filtering experiment selector. Six variants for different filtering rules:

  • Unfiltered — all topics.
  • CuratedV0, CuratedV0V1 — hand-curated topic lists.
  • PostBased{90,75,50}Pct — algorithmically filtered to keep N% most-engaged-with topics.

The parse(s) function defaults to Unfiltered on unknown strings — graceful degradation if a feature-switch value gets corrupted.

as_proto_mode() converts to a proto-side integer enum for downstream consumers.

FilteredTopicsByExperiment

#[derive(Debug, Clone, Default)]
pub struct FilteredTopicsByExperiment {
    pub unfiltered_topic_ids: Option<Vec<i64>>,
    pub curated_v0_topic_ids: Option<Vec<i64>>,
    pub curated_v0_v1_topic_ids: Option<Vec<i64>>,
    pub post_based_90pct_topic_ids: Option<Vec<i64>>,
    pub post_based_75pct_topic_ids: Option<Vec<i64>>,
    pub post_based_50pct_topic_ids: Option<Vec<i64>>,
}

impl FilteredTopicsByExperiment {
    pub fn topic_ids_for_experiment(
        &self,
        experiment: TopicFilteringExperiment,
    ) -> Option<&Vec<i64>> {
        match experiment {
            TopicFilteringExperiment::Unfiltered => self.unfiltered_topic_ids.as_ref(),
            TopicFilteringExperiment::CuratedV0 => self.curated_v0_topic_ids.as_ref(),
            TopicFilteringExperiment::CuratedV0V1 => self.curated_v0_v1_topic_ids.as_ref(),
            TopicFilteringExperiment::PostBased90Pct => self.post_based_90pct_topic_ids.as_ref(),
            TopicFilteringExperiment::PostBased75Pct => self.post_based_75pct_topic_ids.as_ref(),
            TopicFilteringExperiment::PostBased50Pct => self.post_based_50pct_topic_ids.as_ref(),
        }
    }
}

For each post (or hydration call?), all six topic lists are pre-computed and stored. At query time, topic_ids_for_experiment pulls the right one based on which experiment arm the user is in.

This is the upstream produces all variants, downstream picks one pattern. Expensive at write time (6× the data), cheap at read time. Makes A/B testing tractable.

impl MValCodec for FilteredTopicsByExperiment {
    fn thrift_type() -> TType {
        TType::Struct
    }

    fn from_thrift(proto: &mut dyn TInputProtocol) -> Self {
        proto.read_struct_begin().unwrap();
        let mut result = FilteredTopicsByExperiment::default();
        loop {
            let field = proto.read_field_begin().unwrap();
            if field.field_type == TType::Stop {
                break;
            }
            match field.id {
                Some(1) => result.unfiltered_topic_ids = Some(Vec::<i64>::from_thrift(proto)),
                Some(2) => result.curated_v0_topic_ids = Some(Vec::<i64>::from_thrift(proto)),
                Some(3) => result.curated_v0_v1_topic_ids = Some(Vec::<i64>::from_thrift(proto)),
                Some(4) => result.post_based_90pct_topic_ids = Some(Vec::<i64>::from_thrift(proto)),
                Some(5) => result.post_based_75pct_topic_ids = Some(Vec::<i64>::from_thrift(proto)),
                Some(6) => result.post_based_50pct_topic_ids = Some(Vec::<i64>::from_thrift(proto)),
                _ => {
                    proto.skip(field.field_type).unwrap();
                }
            }
            proto.read_field_end().unwrap();
        }
        proto.read_struct_end().unwrap();
        result
    }

    fn to_thrift(&self, _proto: &mut dyn TOutputProtocol) {
        panic!("Not implemented: to_thrift for FilteredTopicsByExperiment")
    }
}

Same pattern as BlockedByUserIds. Read-only Thrift codec with positive field IDs 1..=6 (this is a home-mixer-owned schema, not an upstream extension).


models/user_features.rs (80 lines)

use serde::{Deserialize, Serialize};
use thrift::protocol::{TInputProtocol, TOutputProtocol, TType};
use xai_strato::MValCodec;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct UserFeatures {
    pub muted_keywords: Vec<String>,
    pub blocked_user_ids: Vec<i64>,
    pub muted_user_ids: Vec<i64>,
    pub followed_user_ids: Vec<i64>,
    pub subscribed_user_ids: Vec<i64>,
    pub follower_count: Option<i64>,
}

const MUTED_KEYWORDS_FIELD: i16 = 23093;
const BLOCKED_USER_IDS_FIELD: i16 = -28831;
const MUTED_USER_IDS_FIELD: i16 = -7422;
const FOLLOWED_USER_IDS_FIELD: i16 = -8003;
const SUBSCRIBED_USER_IDS_FIELD: i16 = -21197;
const FOLLOWER_COUNT_FIELD: i16 = -23663;

UserFeatures is the viewer's social-graph state. Embedded as a sub-struct on ScoredPostsQuery. All six fields are hydrated by query hydrators.

Mix of positive and negative field IDs again. The negative ones are home-mixer's local extensions to Strato schemas.

impl MValCodec for UserFeatures {
    fn thrift_type() -> TType {
        TType::Struct
    }

    fn from_thrift(proto: &mut dyn TInputProtocol) -> Self {
        proto.read_struct_begin().unwrap();
        let mut muted_keywords: Vec<String> = Vec::new();
        let mut blocked_user_ids: Vec<i64> = Vec::new();
        let mut muted_user_ids: Vec<i64> = Vec::new();
        let mut followed_user_ids: Vec<i64> = Vec::new();
        let mut subscribed_user_ids: Vec<i64> = Vec::new();
        let mut follower_count: Option<i64> = None;
        loop {
            let field = proto.read_field_begin().unwrap();
            if field.field_type == TType::Stop {
                break;
            }
            match field.id {
                Some(MUTED_KEYWORDS_FIELD) => {
                    muted_keywords = Vec::<String>::from_thrift(proto);
                }
                Some(BLOCKED_USER_IDS_FIELD) => {
                    blocked_user_ids = Vec::<i64>::from_thrift(proto);
                }
                Some(MUTED_USER_IDS_FIELD) => {
                    muted_user_ids = Vec::<i64>::from_thrift(proto);
                }
                Some(FOLLOWED_USER_IDS_FIELD) => {
                    followed_user_ids = Vec::<i64>::from_thrift(proto);
                }
                Some(SUBSCRIBED_USER_IDS_FIELD) => {
                    subscribed_user_ids = Vec::<i64>::from_thrift(proto);
                }
                Some(FOLLOWER_COUNT_FIELD) => {
                    follower_count = Option::<i64>::from_thrift(proto);
                }
                _ => {
                    proto.skip(field.field_type).unwrap();
                }
            }
            proto.read_field_end().unwrap();
        }
        proto.read_struct_end().unwrap();
        UserFeatures {
            muted_keywords,
            blocked_user_ids,
            muted_user_ids,
            followed_user_ids,
            subscribed_user_ids,
            follower_count,
        }
    }

    fn to_thrift(&self, _proto: &mut dyn TOutputProtocol) {
        panic!("Not implemented: to_thrift for UserFeatures")
    }
}

Same codec pattern. Read-only.

End of user_features.rs.


models/brand_safety.rs (91 lines)

The brand-safety verdict computation — used for ad placement decisions.

use std::collections::HashMap;
use xai_x_thrift::tweet_safety_label::{SafetyLabel, SafetyLabelSource, SafetyLabelType};

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[repr(i32)]
pub enum BrandSafetyVerdict {
    #[default]
    Unspecified = 0,
    Safe = 1,
    LowRisk = 2,
    MediumRisk = 3,
}

Four-level verdict. #[repr(i32)] so the enum can be cast to / from i32 (proto interop). #[default] makes Unspecified the default.

pub(crate) const MEDIUM_RISK_LABELS: &[SafetyLabelType] = &[
    SafetyLabelType::NSFW_HIGH_PRECISION,
    SafetyLabelType::NSFW_HIGH_RECALL,
    SafetyLabelType::NSFA_HIGH_PRECISION,
    SafetyLabelType::NSFA_KEYWORDS_HIGH_PRECISION,
    SafetyLabelType::GORE_AND_VIOLENCE_HIGH_PRECISION,
    SafetyLabelType::NSFW_REPORTED_HEURISTICS,
    SafetyLabelType::GORE_AND_VIOLENCE_REPORTED_HEURISTICS,
    SafetyLabelType::NSFW_CARD_IMAGE,
    SafetyLabelType::DO_NOT_AMPLIFY,
    SafetyLabelType::NSFA_COMMUNITY_NOTE,
    SafetyLabelType::PDNA,
    SafetyLabelType::EGREGIOUS_NSFW,
    SafetyLabelType::GROK_NSFA,
    SafetyLabelType::NSFW_TEXT,
];

pub(crate) const LOW_RISK_LABELS: &[SafetyLabelType] = &[
    SafetyLabelType::NSFA_LIMITED_INVENTORY,
    SafetyLabelType::GROK_NSFA_LIMITED,
    SafetyLabelType::NSFA_HIGH_RECALL,
];

const PTOS_CUTOFF_TWEET_ID: u64 = 2_054_275_414_225_846_272;

Two label sets. Any MediumRisk label → MediumRisk verdict. Any LowRisk label (without MediumRisk) → LowRisk verdict.

PTOS_CUTOFF_TWEET_ID is a snowflake ID — posts with IDs at-or-above this threshold are "new enough" to require a PTOS review. Older posts are grandfathered (presumed safe by virtue of not being flagged after this much time on the platform).

pub fn compute_verdict(
    labels: &HashMap<SafetyLabelType, SafetyLabel>,
    tweet_id: u64,
) -> BrandSafetyVerdict {
    if MEDIUM_RISK_LABELS.iter().any(|l| labels.contains_key(l)) {
        return BrandSafetyVerdict::MediumRisk;
    }

    let scored_by_grok = labels.contains_key(&SafetyLabelType::GROK_SFA)
        || labels.contains_key(&SafetyLabelType::GROK_NSFA_LIMITED);
    if !scored_by_grok {
        return BrandSafetyVerdict::MediumRisk;
    }

    if tweet_id >= PTOS_CUTOFF_TWEET_ID && !labels.contains_key(&SafetyLabelType::PTOS_REVIEWED) {
        return BrandSafetyVerdict::MediumRisk;
    }

    if LOW_RISK_LABELS.iter().any(|l| labels.contains_key(l)) {
        return BrandSafetyVerdict::LowRisk;
    }

    BrandSafetyVerdict::Safe
}

The verdict computation. Defaults to MediumRisk unless multiple positive signals confirm otherwise:

  1. Any medium-risk label → MediumRisk.
  2. Must be scored by Grok (either GROK_SFA = "Grok says Safe For Advertisers" or GROK_NSFA_LIMITED = "Grok says limited NSFA"). If not scored, MediumRisk by default (precautionary principle).
  3. Newer posts must have PTOS review. If not, MediumRisk.
  4. Any low-risk label (after passing the above) → LowRisk.
  5. Otherwise → Safe.

So Safe requires: no MediumRisk labels, scored by Grok, PTOS reviewed if recent, and no LowRisk labels. The bar is high.

pub fn worst_verdict(a: &BrandSafetyVerdict, b: &BrandSafetyVerdict) -> BrandSafetyVerdict {
    if *a as i32 >= *b as i32 { *a } else { *b }
}

Combine two verdicts pessimistically. Useful for a post that's both a retweet and a quote — we take the worst of all the constituent posts' verdicts. The as i32 works because of #[repr(i32)] on the enum (higher number = worse).

pub(crate) fn botmaker_rule_id_from(label: &SafetyLabel) -> Option<i64> {
    label.safety_label_source.as_ref().and_then(|src| {
        if let SafetyLabelSource::BotMakerAction(action) = src {
            Some(action.rule_id)
        } else {
            None
        }
    })
}

pub(crate) fn botmaker_rule_category(rule_id: i64) -> &'static str {
    match rule_id {
        1000..=1099 => "Content",
        1100..=1199 => "ContentLimited",
        1200..=1399 => "Safety",
        1400..=1499 => "Grok",
        1500..=1600 => "Quote",
        _ => "Legacy",
    }
}

pub(crate) fn truncate_description(s: &str) -> String {
    s.chars().take(250).collect()
}

Three small helpers used by the brand-safety hydrator.

botmaker_rule_id_from extracts the BotMaker rule ID from a label's source. BotMaker is X's safety rule engine — each safety label can originate from a BotMaker rule.

botmaker_rule_category buckets rule IDs by integer range. Different categories of rules:

  • 1000-1099: Content (general content rules)
  • 1100-1199: ContentLimited (limited reach rules)
  • 1200-1399: Safety (safety violations)
  • 1400-1499: Grok (Grok-driven rules)
  • 1500-1600: Quote (quote-tweet-specific)
  • Else: Legacy (older rules)

truncate_description clips a string to 250 chars at character boundaries. .chars().take(250).collect() is safe across UTF-8 (unlike .get(..250) which can panic mid-codepoint).

End of brand_safety.rs.


models/in_network_reply.rs (23 lines)

The smallest model file. InNetworkReply is a (author, in_reply_to_tweet_id) pair — used to detect "this post is a reply by someone you follow."

use serde::Serialize;
use std::sync::OnceLock;

#[derive(Clone, Debug, Serialize)]
pub struct InNetworkReply {
    pub author_id: u64,
    pub in_reply_to_tweet_id: u64,
}

pub type InNetworkReplies = OnceLock<Vec<InNetworkReply>>;

The interesting bit: InNetworkReplies = OnceLock<Vec<InNetworkReply>>.

OnceLock<T> is std's primitive for "set once, read many times, thread-safe". The cell starts empty; the first writer calls .set(value); afterward, every reader calls .get() and gets Some(&value). Subsequent writes return an error.

Why use OnceLock instead of a plain Vec? Deferred / lazy hydration with shared computation. The replies list is expensive to compute (requires looking up the reply graph). We don't want to pay that cost upfront for every request. Instead:

  1. The query is built with an empty OnceLock.
  2. The first pipeline stage that needs the data calls .get_or_init(|| compute_replies()) to compute and stash.
  3. Subsequent stages call .get() to read.

This pattern works because OnceLock is Sync — multiple stages on different threads can race to be the first writer, and only one wins.

The convention pub type InNetworkReplies = OnceLock<Vec<InNetworkReply>> lets users write InNetworkReplies instead of the full type expression at every reference site.

pub fn serialize_in_network_replies<S>(
    replies: &InNetworkReplies,
    serializer: S,
) -> Result<S::Ok, S::Error>
where
    S: serde::Serializer,
{
    match replies.get() {
        Some(v) => v.serialize(serializer),
        None => serializer.serialize_none(),
    }
}

Custom serializer (referenced from query.rs's #[serde(serialize_with = ...)]). If the OnceLock was populated, serialize the inner vec; otherwise serialize as null. Without this, serde would try to serialize the OnceLock itself, which has no Serialize impl.

End of file.


What we've learned

The shell of home-mixer is small. The bulk of the LOC is just the plumbing for:

  1. gRPC service shell — three RPC methods total: get_scored_posts, get_debug_scored_posts, get_for_you_feed, get_for_you_feed_urt.
  2. QueryBuilder — proto-to-struct conversion with viewer-data lookup, feature-switch evaluation, and the long ScoredPostsQuery::new(...) call.
  3. The XService frameworkXServiceBuilder handles all the cross-cutting concerns (TLS, metrics, gRPC reflection, feature switches, deciders, profiling endpoints, dark traffic).

The architecture is layered:

  • ForYouFeedServer wraps a ForYouCandidatePipeline
  • ForYouCandidatePipeline (which we'll read in Session 06) uses ScoredPostsServer as one of its candidate sources
  • ScoredPostsServer wraps a PhoenixCandidatePipeline
  • PhoenixCandidatePipeline is where the real work happens — sources, hydrators, filters, scorers, side-effects

So the For You feed isn't a flat pipeline; it's a pipeline-of-pipelines. Scored Posts produces ranked candidates; For You adds URT decoration, blending with non-post items (prompts, who-to-follow, ads), and serializes to the wire format.

ScoredPostsQuery is the immutable request context — ~50 fields, populated by QueryBuilder and the query hydrators (which we'll read in Session 09). Every pipeline stage reads from it; some hydrators write to it via the update-pattern from Session 01.

PostCandidate is the per-post mutable state — ~30 fields, populated incrementally by candidate hydrators (Sessions 07–08) and scorers (Session 10). The Option<T> everywhere reflects "not yet hydrated."

Three score fields on PostCandidate (phoenix_scores, weighted_score, score) capture the three stages of scoring: raw probabilities → combined → final adjustments. We'll see the scorer chain in Session 10.

Brand safety is conservative by defaultcompute_verdict returns MediumRisk unless explicitly affirmed Safe by Grok and PTOS-reviewed-if-recent. Misclassification cost is asymmetric: missing a bad post (false negative) is worse than missing an ad opportunity (false positive).

OnceLock for deferred hydrationin_network_replies is computed lazily across threads. Useful pattern when a sub-feature is needed by some pipeline branches but not others.

Open-source artifacts noted:

  • clients and util modules are referenced but not in the release.
  • xai_x_service_builder, xai_dark_traffic, etc. are internal-only.
  • The parse_shard default_value_t = -1 for i16 suggests the shard system uses -1 as sentinel, not Option<i16> — older API design preserved.

Next session

Session 05 — Home-Mixer filters. All 24 filters in home-mixer/filters/, ~1,458 LOC. These implement the pre-scoring safety/duplication/staleness checks (DropDuplicatesFilter, AgeFilter, SelfTweetFilter, AuthorSocialgraphFilter, MutedKeywordFilter, PreviouslySeenPostsFilter, etc.) and the post-selection filters (VFFilter, DedupConversationFilter).

The big one in there is topic_ids_filter.rs at 571 LOC — handles topic-based exclusion across the six different filtering experiments we saw in candidate_features.rs.