X For You algorithm, line by line · Part 2

X For You algorithm, line by line — Part 2: Thunder I (post store)

Part 2 of the deep dive into xai-org/x-algorithm. Thunder's binary startup, Kafka bootstrap, Thrift/proto deserializers, and the in-memory PostStore — DashMap layout, TinyPost vs LightPost, retention trimming, tombstones.

May 14, 2026·36 min read

Thunder is the in-network candidate source. It's a separate Rust binary that:

  1. Subscribes to Kafka topics that fire whenever any post is created or deleted anywhere on X.
  2. Maintains an in-memory index of all "recent" posts, partitioned by author.
  3. Exposes a gRPC endpoint that, given a follow list, returns recent posts authored by those users.

Latency target: sub-millisecond lookup. That's why it's an in-memory store — no database, no cache layer.

This session covers the "shell" of Thunder: how the binary starts up, how it deserializes Kafka payloads, and the central data structure (PostStore). The next session (03) covers the Kafka listener loops and the gRPC service implementation.

Files covered (779 LOC total):

thunder/
├── lib.rs              (11)   module declarations
├── main.rs             (100)  binary entry point — wires Kafka, gRPC, HTTP
├── deserializer.rs     (26)   thrift / protobuf decoders
├── kafka_utils.rs      (115)  Kafka producer/consumer config + bootstrap
└── posts/
    ├── mod.rs          (1)    re-export
    └── post_store.rs   (526)  the central in-memory post store

Note: some files referenced by lib.rs (args, config, metrics, o2, schema, strato_client) are not in the open-source release. They're stubs / declarations only. We'll work with what's visible and call out the holes.


lib.rs (11 lines)

pub mod args;
pub mod config;
pub mod deserializer;
pub mod kafka;
pub mod kafka_utils;
pub mod metrics;
pub mod o2;
pub mod posts;
pub mod schema;
pub mod strato_client;
pub mod thunder_service;

Crate root, just a list of pub mod declarations. Eleven sibling modules. Of these, the open-source repo only includes the implementations for: deserializer, kafka (the listener loops, next session), kafka_utils, posts, thunder_service (next session). The other six (args, config, metrics, o2, schema, strato_client) are declared here but their implementations live in the closed-source xai_* crates referenced via imports.

That's a recurring pattern through this repo: the open-sourced part of Thunder is the business logic, while the deployment-specific glue (CLI args, prometheus metrics, observability, internal-protocol schemas, strato client) is left as opaque modules with names a reader can guess at.

For our purposes:

  • args::Argsclap-parsed CLI flags. Used widely in main.rs (args.post_retention_seconds, args.kafka_num_threads, etc.). We'll infer the shape from usage.
  • config — compile-time constants (e.g. MAX_ORIGINAL_POSTS_PER_AUTHOR).
  • metrics — Prometheus counter/gauge/histogram statics.
  • o2 — observability initialization.
  • schema — Thrift-generated structs (TweetEvent, Event).
  • strato_client — the social-graph client that fetches "who does user X follow."

main.rs (100 lines)

The Thunder binary entry point. It does five things in order:

  1. Build a PostStore.
  2. Build a StratoClient (for following-list lookups).
  3. Build a gRPC service backed by both.
  4. Start a Kafka consumer thread pool, wait for it to catch up to the head of the topic.
  5. Start the HTTP/gRPC server and the background tasks (stats logger, auto-trim).

Let's walk it.

use anyhow::{Context, Result};
use axum::Router;
use clap::Parser;
use log::info;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tonic::service::Routes;
use xai_http_server::{CancellationToken, GrpcConfig, HttpServer};

use thunder::{
    args, kafka_utils, posts::post_store::PostStore, strato_client::StratoClient,
    thunder_service::ThunderServiceImpl,
};

Imports:

  • anyhow for error handling — Thunder uses anyhow::Result<T> everywhere (vs the candidate-pipeline crate which uses Result<T, String>). anyhow::Context lets us tag errors with .context("msg").
  • axum::Router — the web framework. We use a Router::new() (empty) here; the gRPC config is attached separately.
  • clap::Parser — derive macro for CLI parsing.
  • log::info — older log crate (not the tracing crate that candidate-pipeline used). Both coexist via log-to-tracing bridges.
  • tonic::service::Routes — gRPC service registration.
  • xai_http_server — internal crate with HttpServer, GrpcConfig, CancellationToken.
  • Crate-internal imports: args, kafka_utils, posts::post_store::PostStore, strato_client::StratoClient, thunder_service::ThunderServiceImpl.
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();
    let args = args::Args::parse();

#[tokio::main] macro turns async fn main() into a sync fn main() that spawns a Tokio runtime and runs the body. Default thread-per-core runtime — though tokio::main defaults can be configured via #[tokio::main(flavor = "multi_thread", worker_threads = N)].

env_logger::init() reads RUST_LOG=info or similar from the environment and configures the log crate's output.

args::Args::parse() runs clap, exits on --help / parse errors, returns the parsed args struct.

    // Initialize PostStore
    let post_store = Arc::new(PostStore::new(
        args.post_retention_seconds,
        args.request_timeout_ms,
    ));
    info!(
        "Initialized PostStore for in-memory post storage (retention: {} seconds / {:.1} days, request_timeout: {}ms)",
        args.post_retention_seconds,
        args.post_retention_seconds as f64 / 86400.0,
        args.request_timeout_ms
    );

Build the post store, wrapped in Arc. Two parameters:

  • post_retention_seconds: how long to keep posts. The log line reformats this as days (/ 86400.0).
  • request_timeout_ms: how long a get_posts_by_users call can spend before bailing out. Important because the store may have to scan many user timelines, and we don't want a long-tail user with a huge follow list to drag down p99 latency.

Why Arc? Because we're about to share it with the Kafka workers, the gRPC service, the stats-logger task, and the auto-trim task. Arc (Atomic Reference Count) makes it safe to share across threads.

    // Initialize StratoClient for fetching following lists
    let strato_client = Arc::new(StratoClient::new());
    info!("Initialized StratoClient");

StratoClient is the social-graph fetcher. "Strato" is Twitter's old internal data-fetching service — the name carried over. It's also Arc-wrapped.

    // Create ThunderService with the PostStore, StratoClient, and concurrency limit
    let thunder_service = ThunderServiceImpl::new(
        Arc::clone(&post_store),
        Arc::clone(&strato_client),
        args.max_concurrent_requests,
    );
    info!(
        "Initialized with max_concurrent_requests={}",
        args.max_concurrent_requests
    );
    let routes = Routes::new(thunder_service.server());

ThunderServiceImpl is the gRPC service struct (defined in thunder_service.rs, covered in Session 03). Construct it by cloning the two Arcs and passing the concurrency limit.

Arc::clone(&x) is preferred over x.clone() here because the former makes it explicit you're cloning the Arc (cheap, bumps refcount) rather than the inner PostStore (expensive — would clone all the DashMaps).

Routes::new(thunder_service.server()) builds a tonic Routes object from the service's protobuf-generated server type. The server() call (we'll see in session 03) returns a XaiThunderProtoServer wrapper that implements tower::Service.

    // Set up gRPC config
    let grpc_config = GrpcConfig::new(args.grpc_port, routes);

    // Create HTTP server with gRPC support
    let mut http_server = HttpServer::new(
        args.http_port,
        Router::new(),
        Some(grpc_config),
        CancellationToken::new(),
        Duration::from_secs(10),
    )
    .await
    .context("Failed to create HTTP server")?;

Build the gRPC config (port + routes), then build the HTTP server. The HTTP server takes:

  • http_port: a separate port for HTTP traffic (health checks, profiling endpoints, etc.).
  • Router::new(): an empty axum router — no HTTP endpoints, just gRPC. The router exists because HttpServer is generic over an axum router for HTTP traffic.
  • Some(grpc_config): the gRPC config to mount on the gRPC port.
  • CancellationToken::new(): graceful-shutdown signal. Used by background tasks to know when to stop.
  • Duration::from_secs(10): graceful-shutdown timeout.

.await.context(...)HttpServer::new is async (it binds the socket); if it fails, attach a string error context.

So Thunder listens on two ports: one HTTP (e.g. for liveness probes), one gRPC (for feed requests). Standard pattern.

    if args.enable_profiling {
        xai_profiling::spawn_server(3000, CancellationToken::new()).await;
    }

Optional pprof-style profiling server on port 3000. Conditionally enabled by a flag — only run in performance investigation contexts.

    // Create channel for post events
    let (tx, mut rx) = tokio::sync::mpsc::channel::<i64>(args.kafka_num_threads);
    kafka_utils::start_kafka(&args, post_store.clone(), "", tx).await?;

Now the Kafka boot:

  • tokio::sync::mpsc::channel::<i64>(N) creates a multiple-producer, single-consumer channel with buffer size N (one per Kafka thread). Each i64 is "I'm done with initial catchup" sent by a Kafka thread once it's caught up to the head.
  • kafka_utils::start_kafka spawns the Kafka consumer threads. They start consuming from earliest offset, feed events into post_store, and send a signal on tx once they reach the tail.

The third argument "" is the empty-string user/group-id placeholder — when running in serving mode this gets replaced by a UUID (we'll see in kafka_utils.rs).

    if args.is_serving {
        // Wait for Kafka catchup signal
        let start = Instant::now();
        for _ in 0..args.kafka_num_threads {
            rx.recv().await;
        }
        info!("Kafka init took {:?}", start.elapsed());

        post_store.finalize_init().await?;

Catchup logic: before we start serving traffic, every Kafka thread needs to have read enough events to populate the in-memory store. We loop kafka_num_threads times, awaiting one signal per thread. Once all have signalled, we know all threads are caught up.

Then post_store.finalize_init() sorts every per-user post deque and trims any posts that fell out of retention while we were ingesting. (We'll see what that does inside the PostStore impl below.)

        // Start stats logger
        Arc::clone(&post_store).start_stats_logger();
        info!("Started PostStore stats logger",);

        // Start auto-trim task to remove posts older than retention period
        Arc::clone(&post_store).start_auto_trim(2); // Run every 2 minutes
        info!(
            "Started PostStore auto-trim task (interval: 2 minutes, retention: {:.1} days)",
            args.post_retention_seconds as f64 / 86400.0
        );
    }

Background tasks:

  • start_stats_logger() — every 5s, log + emit metrics about the store's size.
  • start_auto_trim(2) — every 2 minutes, sweep the store and remove posts older than retention.

Both take self: Arc<Self> (we'll see — it's a fluent way to spawn a task that owns the Arc). The Arc::clone(&post_store) clones the arc before consuming it in the self: Arc<Self> method.

    http_server.set_readiness(true);
    info!("HTTP/gRPC server is ready");

    // Wait for termination signal
    http_server.wait_for_termination().await;
    info!("Server terminated");

    Ok(())
}

Finally: flip readiness to true so Kubernetes liveness/readiness probes start passing, log readiness, then block on wait_for_termination() (which waits for SIGTERM / SIGINT). On termination, the function returns and Tokio runs destructors.

Order-of-events note: readiness is flipped to true after Kafka catchup. So the load balancer won't route traffic to Thunder until the store has been initially populated.

That's main.rs. Standard startup choreography for a stateful Rust microservice.


deserializer.rs (26 lines)

Three free functions, all of the same shape: take a &[u8] payload and decode it into a Rust struct.

use crate::schema::{events::Event, tweet_events::TweetEvent};
use anyhow::{Context, Result};
use prost::Message;
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
use xai_thunder_proto::InNetworkEvent;

Imports:

  • crate::schema::events::Event and crate::schema::tweet_events::TweetEvent: Thrift-generated structs (from schema/, not open-sourced). These are legacy formats.
  • prost::Message — the protobuf decoder trait. prost is the Rust crate that generates proto code.
  • thrift::protocol::{TBinaryInputProtocol, TSerializable} — Thrift binary-protocol decoder.
  • xai_thunder_proto::InNetworkEvent — protobuf-generated struct from an external crate. This is the new event format.

So three formats coexist: two legacy Thrift formats (TweetEvent, Event) and one new proto format (InNetworkEvent). Migration in progress.

/// Deserialize a Thrift binary message into TweetEvent
pub fn deserialize_tweet_event(payload: &[u8]) -> Result<TweetEvent> {
    let mut cursor = std::io::Cursor::new(payload);
    let mut protocol = TBinaryInputProtocol::new(&mut cursor, true);

    TweetEvent::read_from_in_protocol(&mut protocol).context("Failed to deserialize TweetEvent")
}

Decode a Thrift TweetEvent. Pattern:

  1. Wrap the byte slice in std::io::Cursor (which implements std::io::Read).
  2. Wrap the cursor in TBinaryInputProtocol. The second arg true is strict_read mode — fail if any field is missing.
  3. Call TweetEvent::read_from_in_protocol(...) (a method on the generated struct) to decode.
  4. Tag any error with .context("Failed to deserialize TweetEvent").
/// Deserialize a Thrift binary message into Event
pub fn deserialize_event(payload: &[u8]) -> Result<Event> {
    let mut cursor = std::io::Cursor::new(payload);
    let mut protocol = TBinaryInputProtocol::new(&mut cursor, true);

    Event::read_from_in_protocol(&mut protocol).context("Failed to deserialize Event")
}

Identical pattern for Event. Could be made generic (fn deserialize_thrift<T: TSerializable>(...) -> Result<T>) but the duplication is fine for two call sites.

/// Deserialize a proto binary message into InNetworkEvent
pub fn deserialize_tweet_event_v2(payload: &[u8]) -> Result<InNetworkEvent> {
    InNetworkEvent::decode(payload).context("Failed to deserialize InNetworkEvent")
}

The proto version. prost's Message::decode takes the byte slice directly — no cursor needed. One line of decode logic. The v2 naming is consistent with tweet_events_listener_v2.rs (next session).

End of file. Trivial wrapper utilities.


kafka_utils.rs (115 lines)

The Kafka consumer/producer config and bootstrap. Two modes: serving mode (consume from IN_NETWORK_EVENTS_TOPIC only) and non-serving mode (consume legacy events, transform, produce to IN_NETWORK_EVENTS_TOPIC).

use anyhow::{Context, Result};
use std::sync::Arc;
use xai_kafka::KafkaProducerConfig;
use xai_kafka::config::{KafkaConfig, KafkaConsumerConfig, SslConfig};
use xai_wily::WilyConfig;

use crate::{
    args,
    kafka::{
        tweet_events_listener::start_tweet_event_processing,
        tweet_events_listener_v2::start_tweet_event_processing_v2,
    },
};

Imports:

  • xai_kafka config types: KafkaProducerConfig, KafkaConfig (the base config common to producers and consumers), KafkaConsumerConfig, SslConfig.
  • xai_wily::WilyConfig — Wily is an internal service-discovery system; WilyConfig configures how Kafka connects through it.
  • The two listener entry points: start_tweet_event_processing (legacy/v1) and start_tweet_event_processing_v2 (proto/v2).
const TWEET_EVENT_TOPIC: &str = "";
const TWEET_EVENT_DEST: &str = "";

const IN_NETWORK_EVENTS_DEST: &str = "";
const IN_NETWORK_EVENTS_TOPIC: &str = "";

Four named-but-empty string constants. Notable: in the open-source release, the Kafka topic names and broker destinations have been redacted to empty strings. The internal X deployment populates these. They're left as named constants so the call sites read meaningfully.

pub async fn start_kafka(
    args: &args::Args,
    post_store: Arc<crate::posts::post_store::PostStore>,
    user: &str,
    tx: tokio::sync::mpsc::Sender<i64>,
) -> Result<()> {

The single public function: start_kafka. Takes:

  • args: the parsed CLI args.
  • post_store: Arc to share with the listener.
  • user: a string used in the consumer group ID. Empty when called from main.rs in serving mode (the UUID is appended internally), populated for non-serving testing/scripts.
  • tx: the catchup-signal channel sender.

Returns Result<()>.

    let sasl_password = std::env::var("")
        .ok()
        .or(args.sasl_password.clone())?;

    let producer_sasl_password = std::env::var("")
        .ok()
        .or(args.producer_sasl_password.clone());

SASL (Simple Authentication and Security Layer) password resolution:

  • std::env::var("") reads an env var (name redacted in source). Returns Result<String, VarError>.
  • .ok() converts to Option<String> (drops the error).
  • .or(args.sasl_password.clone()) — if the env var wasn't set, fall back to the CLI arg.
  • ? — for sasl_password, propagate the None case as an error.

For producer_sasl_password we don't ? — it's allowed to be None. The semantic difference: consumer auth is required, producer auth is optional (only used in non-serving mode).

    if args.is_serving {
        let unique_id = uuid::Uuid::new_v4().to_string();

        let v2_tweet_events_consumer_config = KafkaConsumerConfig {
            base_config: KafkaConfig {
                dest: args.in_network_events_consumer_dest.clone(),
                topic: IN_NETWORK_EVENTS_TOPIC.to_string(),
                wily_config: Some(WilyConfig::default()),
                ssl: Some(SslConfig {
                    security_protocol: args.security_protocol.clone(),
                    sasl_mechanism: Some(args.producer_sasl_mechanism.clone()),
                    sasl_username: Some(args.producer_sasl_username.clone()),
                    sasl_password: producer_sasl_password.clone(),
                }),
                ..Default::default()
            },
            group_id: format!("{}-{}", args.kafka_group_id, unique_id),
            auto_offset_reset: args.auto_offset_reset.clone(),
            fetch_timeout_ms: args.fetch_timeout_ms,
            max_partition_fetch_bytes: Some(1024 * 1024 * 100),
            skip_to_latest: args.skip_to_latest,
            ..Default::default()
        };

Serving-mode config. Build a KafkaConsumerConfig (struct literal):

  • base_config: KafkaConfig:
    • dest: the broker destination from CLI.
    • topic: the (redacted) IN_NETWORK_EVENTS_TOPIC.
    • wily_config: Some(WilyConfig::default()): enable service discovery via Wily.
    • ssl: Some(SslConfig { … }): SASL/SSL configuration. Note: uses producer credentials for the consumer in serving mode. This is because the IN_NETWORK_EVENTS_TOPIC is produced to in non-serving mode and consumed from in serving mode — both sides need the same producer credentials.
    • ..Default::default(): fill the rest with defaults.
  • group_id: "{kafka_group_id}-{unique_id}". Unique per process so multiple Thunder instances each get their own partition assignment (no consumer-group rebalance).
  • auto_offset_reset: where to start consuming if no committed offset (e.g. "earliest" for catchup, "latest" for production).
  • fetch_timeout_ms: Kafka fetch timeout.
  • max_partition_fetch_bytes: Some(1024 * 1024 * 100): 100 MB per partition fetch. Aggressive — gives high throughput at the cost of memory.
  • skip_to_latest: skip backlog if true (use for emergency restart).
        // Start Kafka background tasks
        start_tweet_event_processing_v2(
            v2_tweet_events_consumer_config,
            Arc::clone(&post_store),
            args,
            tx,
        )
        .await;
    }

Spawn the v2 listener with the config. It runs in the background (the function is async but doesn't loop forever — it returns after spawning the consumer threads).

    // Only start Kafka processing and background tasks if not in serving mode
    if !args.is_serving {
        // Create Kafka consumer config
        let tweet_events_consumer_config = KafkaConsumerConfig {
            base_config: KafkaConfig {
                dest: TWEET_EVENT_DEST.to_string(),
                topic: TWEET_EVENT_TOPIC.to_string(),
                wily_config: Some(WilyConfig::default()),
                ssl: Some(SslConfig {
                    security_protocol: args.security_protocol.clone(),
                    sasl_mechanism: Some(args.sasl_mechanism.clone()),
                    sasl_username: Some(args.sasl_username.clone()),
                    sasl_password: Some(sasl_password.clone()),
                }),
                ..Default::default()
            },
            group_id: format!("{}-{}", args.kafka_group_id, user),
            auto_offset_reset: args.auto_offset_reset.clone(),
            enable_auto_commit: false,
            fetch_timeout_ms: args.fetch_timeout_ms,
            max_partition_fetch_bytes: Some(1024 * 1024 * 10),
            partitions: None,
            skip_to_latest: args.skip_to_latest,
            ..Default::default()
        };

Non-serving mode (sometimes called "ingest" or "transformer" mode). Build a consumer for the legacy topic TWEET_EVENT_TOPIC. Differences from the v2 config:

  • dest: TWEET_EVENT_DEST.to_string(): the legacy broker (not from CLI; constant).
  • ssl: uses consumer credentials (the sasl_* args, not producer_sasl_*).
  • group_id: "{kafka_group_id}-{user}". Uses the user parameter, not a UUID. Multiple non-serving runs with the same user share the group (and thus split partition assignment), which is what you want for a batch pipeline.
  • enable_auto_commit: false: don't auto-commit offsets. Manual commit gives at-least-once delivery semantics; the consumer commits only after successfully producing the transformed event.
  • max_partition_fetch_bytes: Some(1024 * 1024 * 10): 10 MB (vs. 100 MB in serving). Less greedy, since transformation work is per-event and bottlenecks earlier.
  • partitions: None: subscribe to all partitions (Kafka assigns within the group).
        let producer_config = KafkaProducerConfig {
            base_config: KafkaConfig {
                dest: IN_NETWORK_EVENTS_DEST.to_string(),
                topic: IN_NETWORK_EVENTS_TOPIC.to_string(),
                wily_config: Some(WilyConfig::default()),
                ssl: Some(SslConfig {
                    security_protocol: args.security_protocol.clone(),
                    sasl_mechanism: Some(args.producer_sasl_mechanism.clone()),
                    sasl_username: Some(args.producer_sasl_username.clone()),
                    sasl_password: producer_sasl_password.clone(),
                }),
                ..Default::default()
            },
            ..Default::default()
        };

The corresponding producer config. Writes to IN_NETWORK_EVENTS_TOPIC (the same topic the serving-mode consumer reads from). Uses producer credentials.

So non-serving mode = a transformer: consume legacy TweetEvents, transform them to the new InNetworkEvent proto, produce them to the new topic.

        start_tweet_event_processing(tweet_events_consumer_config, producer_config, args).await;
    }

    Ok(())
}

Spawn the legacy listener with both configs. Return Ok(()).

What this gives you: a single binary that can run in either of two roles. Serving Thunder = read v2 events, populate in-memory store. Non-serving Thunder = read legacy events, write v2 events. Both are spawned via the same start_kafka function call — if args.is_serving decides which branch executes.


posts/mod.rs (1 line)

pub mod post_store;

Re-export. Nothing to discuss.


posts/post_store.rs (526 lines)

The big one. The in-memory post store. This is the core data structure of Thunder.

The design:

  • Posts are keyed by post_id (i64) in one big DashMap.
  • For lookup, we maintain three secondary indexes — each mapping user_id → VecDeque<TinyPost>:
    • original_posts_by_user: top-level original posts (not replies, not retweets).
    • secondary_posts_by_user: replies + retweets.
    • video_posts_by_user: posts containing video (a subset that may overlap with the above two).
  • A fourth map deleted_posts: DashMap<i64, bool> tombstones deleted posts so the store can drop them lazily.

DashMap is the lock-free concurrent hashmap from the dashmap crate — segmented locking, fine-grained, no global lock. Critical for the workload: thousands of write operations per second from Kafka, plus thousands of read operations per second from the gRPC service.

Walking the file top to bottom:

use anyhow::Result;
use dashmap::DashMap;
use log::info;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use xai_thunder_proto::{LightPost, TweetDeleteEvent};

use crate::config::{
    DELETE_EVENT_KEY, MAX_ORIGINAL_POSTS_PER_AUTHOR, MAX_REPLY_POSTS_PER_AUTHOR,
    MAX_TINY_POSTS_PER_USER_SCAN, MAX_VIDEO_POSTS_PER_AUTHOR,
};
use crate::metrics::{
    POST_STORE_DELETED_POSTS, POST_STORE_DELETED_POSTS_FILTERED, POST_STORE_ENTITY_COUNT,
    POST_STORE_POSTS_RETURNED, POST_STORE_POSTS_RETURNED_RATIO, POST_STORE_REQUEST_TIMEOUTS,
    POST_STORE_REQUESTS, POST_STORE_TOTAL_POSTS, POST_STORE_USER_COUNT,
};

Imports:

  • anyhow::Result.
  • dashmap::DashMap — concurrent hashmap.
  • log::info for stats logging.
  • HashSet and VecDeque from std.
  • Arc, Duration, Instant.
  • xai_thunder_proto::{LightPost, TweetDeleteEvent} — proto-generated structs. LightPost is the lightweight post representation (we store millions of these in memory). TweetDeleteEvent is the delete-event proto.
  • Constants from crate::config: DELETE_EVENT_KEY, MAX_*_POSTS_PER_AUTHOR, MAX_TINY_POSTS_PER_USER_SCAN. Inferring from usage: DELETE_EVENT_KEY is a sentinel user-ID under which delete events are stored; the MAX_ constants are caps on how many entries to store per user / scan.
  • Prometheus metrics from crate::metrics.
/// Minimal post reference stored in user timelines (only ID and timestamp)
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TinyPost {
    pub post_id: i64,
    pub created_at: i64,
}

impl TinyPost {
    /// Create a new TinyPost from a post ID and creation timestamp
    pub fn new(post_id: i64, created_at: i64) -> Self {
        TinyPost {
            post_id,
            created_at,
        }
    }
}

TinyPost is exactly what it says — a 16-byte struct (two i64s) used in the per-user deques. The actual LightPost (the proto struct, which has author, text, media flags, etc.) is stored separately in the posts map.

Why separate? Because we need to maintain per-user sorted lists for fast "give me the N most recent posts by author X" queries. If we put full LightPosts in the deques, each post would be duplicated everywhere it appears (and posts can appear in multiple lists — original + video). With TinyPost references, each user's deque is just IDs + timestamps; the full post lives once in posts. Memory savings ~50x.

TinyPost derives Hash and Eq (despite not being used as a hash key in this file — it's defensive, in case a caller needs to dedup).

/// A thread-safe store for posts grouped by user ID
/// Note: LightPost is now defined in the protobuf schema (in-network.proto)
#[derive(Clone)]
pub struct PostStore {
    /// Full post data indexed by post_id
    posts: Arc<DashMap<i64, LightPost>>,
    /// Maps user_id to a deque of TinyPost references for original posts (non-reply, non-retweet)
    original_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
    /// Maps user_id to a deque of TinyPost references for replies and retweets
    secondary_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
    /// Maps user_id to a deque of TinyPost references for video posts
    video_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
    deleted_posts: Arc<DashMap<i64, bool>>,
    /// Retention period for posts in seconds
    retention_seconds: u64,
    /// Request timeout for get_posts_by_users iteration (0 = no timeout)
    request_timeout: Duration,
}

The struct. Five Arc<DashMap<…>> fields + two config fields. Notable:

  • Everything is wrapped in Arc because of the #[derive(Clone)] — cloning the PostStore clones the Arcs, not the maps. So all clones share the same underlying storage.
  • deleted_posts: Arc<DashMap<i64, bool>> — the value is bool, but only true is ever stored. It's effectively a concurrent HashSet<i64>. Could be a DashSet but DashMap<K, bool> works too.
  • request_timeout: Duration0 means "no timeout" (per the comment).
impl PostStore {
    /// Creates a new empty PostStore with the specified retention period and request timeout
    pub fn new(retention_seconds: u64, request_timeout_ms: u64) -> Self {
        PostStore {
            posts: Arc::new(DashMap::new()),
            original_posts_by_user: Arc::new(DashMap::new()),
            secondary_posts_by_user: Arc::new(DashMap::new()),
            video_posts_by_user: Arc::new(DashMap::new()),
            deleted_posts: Arc::new(DashMap::new()),
            retention_seconds,
            request_timeout: Duration::from_millis(request_timeout_ms),
        }
    }

Constructor. Initialize all five DashMaps empty, take retention + timeout. Duration::from_millis does the unit conversion.

    pub fn mark_as_deleted(&self, posts: Vec<TweetDeleteEvent>) {
        for post in posts.into_iter() {
            self.posts.remove(&post.post_id);
            self.deleted_posts.insert(post.post_id, true);

            let mut user_posts_entry = self
                .original_posts_by_user
                .entry(DELETE_EVENT_KEY)
                .or_default();
            user_posts_entry.push_back(TinyPost {
                post_id: post.post_id,
                created_at: post.deleted_at,
            });
        }
    }

mark_as_deleted processes a batch of delete events. For each:

  1. Remove the post from the posts map (frees the full LightPost).
  2. Insert into deleted_posts — a tombstone, so we don't re-insert the post if a re-ordered create event arrives later.
  3. Append a TinyPost to original_posts_by_user[DELETE_EVENT_KEY]. This is clever: the deque for the sentinel user DELETE_EVENT_KEY (e.g. -1) holds all deletions in chronological order. That's how trim_old_posts later prunes old tombstones — by walking this special deque from the oldest end.

The TinyPost's created_at field is set to post.deleted_at here — slight overloading of the field, but it's just a timestamp for the tombstone-trimming logic.

entry(...).or_default() is the standard "get-or-insert-empty" pattern. Returns a dashmap::mapref::entry::OccupiedEntry (or VacantEntry → OccupiedEntry after default insert) that gives mutable access. push_back is on the inner VecDeque.

Note: the entry guard locks the bucket for the duration of user_posts_entry. Holding it across the push_back is fine — it's a quick op.

    /// Inserts posts into the post store
    pub fn insert_posts(&self, mut posts: Vec<LightPost>) {
        // Filter to keep only posts created in the last retention_seconds and not from the future
        let current_time = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs() as i64;
        posts.retain(|p| {
            p.created_at < current_time
                && current_time - p.created_at <= (self.retention_seconds as i64)
        });

        // Sort remaining posts by created_at timestamp
        posts.sort_unstable_by_key(|p| p.created_at);

        Self::insert_posts_internal(self, posts);
    }

insert_posts is the batch ingest path. Steps:

  1. Compute current_time as unix seconds. unwrap_or_default() defaults to Duration::ZERO if the clock somehow went before unix epoch (shouldn't happen but defensive). .as_secs() as i64 because created_at is i64 in the proto.
  2. posts.retain(...) — drop posts that are in the future (p.created_at >= current_time) or older than retention. The first check defends against clock-skewed publishers; the second drops backlog.
  3. posts.sort_unstable_by_key(|p| p.created_at) — sort ascending by timestamp. This matters because the per-user deques are append-only and we want them to stay in chronological order; if we insert out-of-order, lookups (which take from the back = newest) get wrong answers.
  4. Delegate to insert_posts_internal.

Why sort_unstable over sort? Unstable is faster (in-place quicksort vs. mergesort) and we don't care about preserving relative order of equal-timestamp posts.

    pub async fn finalize_init(&self) -> Result<()> {
        self.sort_all_user_posts().await;
        self.trim_old_posts().await;

        // This is needed because order of create_event/delete_event can be be lost in the feeder
        for entry in self.deleted_posts.iter() {
            self.posts.remove(entry.key());
        }

        Ok(())
    }

finalize_init is called once after Kafka catchup. Three steps:

  1. sort_all_user_posts() — re-sort every user's deque. We sorted incoming batches in insert_posts, but multiple batches arrive interleaved across threads, so we re-sort to be safe.
  2. trim_old_posts() — drop posts past retention.
  3. For every tombstoned post, ensure it's not in the main posts map. Order of operations matters: if a create event arrived after a delete event for the same post (Kafka can reorder, especially across partitions), insert_posts_internal would store it. After catchup, we sweep and remove all tombstoned posts from posts.

The comment "be be lost" is a typo in the source.

    fn insert_posts_internal(&self, posts: Vec<LightPost>) {
        for post in posts {
            let post_id = post.post_id;
            let author_id = post.author_id;
            let created_at = post.created_at;
            let is_original = !post.is_reply && !post.is_retweet;

            if self.deleted_posts.contains_key(&post_id) {
                continue;
            }

The actual per-post insert logic. For each post:

  • Copy out the fields we'll need into locals (post_id, author_id, created_at). This is because we're about to move post into the map, and after the move we can't access fields.
  • is_original is !is_reply && !is_retweet. So "original" means "not a reply and not a retweet" — quote tweets and originals both count as original.
  • Skip if tombstoned.
            // Store the full post data
            let old = self.posts.insert(post_id, post);
            if old.is_some() {
                // if already stored - don't add it again
                continue;
            }

DashMap::insert returns the previous value (if any). If old.is_some(), we'd be duplicating, so skip the per-user deque updates. The post itself is now in the map (overwritten), which is fine — both versions should be the same proto.

This deduplication is important because Kafka delivers at-least-once: we may see the same LightPost twice.

            // Create a TinyPost reference for the timeline
            let tiny_post = TinyPost::new(post_id, created_at);

            // Use entry API to get mutable access to the appropriate user's posts timeline
            if is_original {
                let mut user_posts_entry =
                    self.original_posts_by_user.entry(author_id).or_default();
                user_posts_entry.push_back(tiny_post.clone());
            } else {
                let mut user_posts_entry =
                    self.secondary_posts_by_user.entry(author_id).or_default();
                user_posts_entry.push_back(tiny_post.clone());
            }

Make a TinyPost reference, then push it onto either original_posts_by_user or secondary_posts_by_user based on is_original. Standard entry-API pattern.

.clone() on TinyPost is just 16 bytes copied — trivially cheap. We clone because we might push it onto the video deque below.

            let mut video_eligible = post.has_video;

            // If this is a retweet and the retweeted post has video, mark has_video as true
            if !video_eligible
                && post.is_retweet
                && let Some(source_post_id) = post.source_post_id
                && let Some(source_post) = self.posts.get(&source_post_id)
            {
                video_eligible = !source_post.is_reply && source_post.has_video;
            }

            if post.is_reply {
                video_eligible = false;
            }

Wait — this references post after it's been moved into self.posts.insert(post_id, post) on line 127. How does that work?

Look carefully: let old = self.posts.insert(post_id, post); — yes, post was moved. Then the code reads post.has_video, post.is_retweet, etc. That should be a compile error… unless post.has_video etc. were destructured earlier. Let me re-check the code.

Re-reading the source: there's no destructuring of post other than post_id, author_id, created_at. So post.has_video should fail to compile.

This appears to be a bug or an oversimplification in the open-source release. It's possible that in the real code:

  • post is Copy (unlikely for a proto struct).
  • Or let old = self.posts.insert(...) is actually let old = self.posts.insert(post_id, post.clone()).
  • Or the open-source extract has been edited and dropped the clone.

Either way, the intent of the code is clear:

  1. Start with video_eligible = post.has_video.
  2. Promote retweets: if the post itself has no video, but it's a retweet of a post that does have video (and isn't a reply), then this retweet is also video-eligible. This requires looking up the source post in the store.
  3. Disqualify replies: if the post is a reply, force video_eligible = false regardless. Replies are excluded from the video index entirely.

The Rust let chains syntax (&& let Some(x) = ...) chains pattern matches inside an if. Stabilized in modern Rust.

            // Also add to video posts timeline if post has video
            if video_eligible {
                let mut user_posts_entry = self.video_posts_by_user.entry(author_id).or_default();
                user_posts_entry.push_back(tiny_post);
            }
        }
    }

If video-eligible, push onto the video deque. Note this consumes tiny_post (no .clone() this time — last use).

End of insert_posts_internal. To summarise: every post ends up in posts keyed by ID, plus pointers in 1-2 per-user deques (original/secondary, optionally video).

    /// Retrieves video posts from multiple users
    pub fn get_videos_by_users(
        &self,
        user_ids: &[i64],
        exclude_tweet_ids: &HashSet<i64>,
        start_time: Instant,
        request_user_id: i64,
    ) -> Vec<LightPost> {
        let video_posts = self.get_posts_from_map(
            &self.video_posts_by_user,
            user_ids,
            MAX_VIDEO_POSTS_PER_AUTHOR,
            exclude_tweet_ids,
            &HashSet::new(),
            start_time,
            request_user_id,
        );

        POST_STORE_POSTS_RETURNED.observe(video_posts.len() as f64);
        video_posts
    }

get_videos_by_users is the gRPC-callable lookup for video. Args:

  • user_ids: the list of authors to fetch from (i.e. the requester's follow list).
  • exclude_tweet_ids: posts to skip (e.g. previously seen).
  • start_time: when this RPC started (for timeout).
  • request_user_id: the requester's user ID (for logging and self-retweet filtering).

It just delegates to get_posts_from_map with MAX_VIDEO_POSTS_PER_AUTHOR and the video map. The empty &HashSet::new() for following_users is the key: passing empty means "don't do reply chain analysis" (we'll see why below).

After the call, emit a metric for the returned count.

    /// Retrieves all posts from multiple users
    pub fn get_all_posts_by_users(
        &self,
        user_ids: &[i64],
        exclude_tweet_ids: &HashSet<i64>,
        start_time: Instant,
        request_user_id: i64,
    ) -> Vec<LightPost> {
        let following_users_set: HashSet<i64> = user_ids.iter().copied().collect();

        let mut all_posts = self.get_posts_from_map(
            &self.original_posts_by_user,
            user_ids,
            MAX_ORIGINAL_POSTS_PER_AUTHOR,
            exclude_tweet_ids,
            &HashSet::new(),
            start_time,
            request_user_id,
        );

        let secondary_posts = self.get_posts_from_map(
            &self.secondary_posts_by_user,
            user_ids,
            MAX_REPLY_POSTS_PER_AUTHOR,
            exclude_tweet_ids,
            &following_users_set,
            start_time,
            request_user_id,
        );

        all_posts.extend(secondary_posts);
        POST_STORE_POSTS_RETURNED.observe(all_posts.len() as f64);
        all_posts
    }

get_all_posts_by_users is the main feed lookup. Two passes:

  1. First pass: original posts, with &HashSet::new() as following_users (i.e. no reply-chain filtering). We pull MAX_ORIGINAL_POSTS_PER_AUTHOR per author.
  2. Second pass: secondary posts (replies + retweets). This time we pass the actual follow set (following_users_set) so get_posts_from_map applies reply-chain filtering — see below.

Then concatenate and return. Note MAX_*_POSTS_PER_AUTHOR is typically different per category (e.g. more original than secondary), tuned to feed quality.

The following_users_set is built once from user_ids and passed as a reference. Building it via .iter().copied().collect() is the standard pattern: iterate by reference, copy each i64 (Copy is cheap), collect into the HashSet.

    #[allow(clippy::too_many_arguments)]
    pub fn get_posts_from_map(
        &self,
        posts_map: &Arc<DashMap<i64, VecDeque<TinyPost>>>,
        user_ids: &[i64],
        max_per_user: usize,
        exclude_tweet_ids: &HashSet<i64>,
        following_users: &HashSet<i64>,
        start_time: Instant,
        request_user_id: i64,
    ) -> Vec<LightPost> {
        POST_STORE_REQUESTS.inc();
        let mut light_posts = Vec::new();

        let mut total_eligible: usize = 0;

The heavy lifter. #[allow(clippy::too_many_arguments)] suppresses the clippy warning — there's no clean way to package these args into a struct without bloating the type.

Increment a request counter, init the result vec and an eligibility counter.

        for (i, user_id) in user_ids.iter().enumerate() {
            if !self.request_timeout.is_zero() && start_time.elapsed() >= self.request_timeout {
                log::error!(
                    "Timed out fetching posts for user={}; Processed: {}/{}. Stage: {}",
                    request_user_id,
                    i,
                    user_ids.len(),
                    if following_users.is_empty() {
                        "original"
                    } else {
                        "secondary"
                    }
                );
                POST_STORE_REQUEST_TIMEOUTS.inc();
                break;
            }

For each user_id in the follow list:

  • Check the timeout. If request_timeout is non-zero and elapsed exceeds it, log an error (with how many users we got through and which stage we were in — "original" vs "secondary" inferred from following_users.is_empty()), increment a timeout counter, break the loop.
  • This is why we pass start_time rather than starting one inside the function — it lets the outer gRPC handler set the deadline once and have it apply across both calls (original then secondary).
            if let Some(user_posts_ref) = posts_map.get(user_id) {
                let user_posts = user_posts_ref.value();
                total_eligible += user_posts.len();

                // Start from newest posts (reverse iterator)
                // Take a capped number to prevent from going all the way back to when user is inactive
                let tiny_posts_iter = user_posts
                    .iter()
                    .rev()
                    .filter(|post| !exclude_tweet_ids.contains(&post.post_id))
                    .take(MAX_TINY_POSTS_PER_USER_SCAN);

Look up this user's deque. posts_map.get(user_id) returns Option<Ref<…>>. If they have any posts:

  • Get the deque via .value() (the Ref derefs to &VecDeque<TinyPost>).
  • Add to total_eligible (for the ratio metric below).
  • Build a tiny_posts_iter that:
    1. Iterates posts.
    2. .rev() — reverse, so newest first (the deque is sorted oldest-to-newest, and push_back appends, so .rev() walks from newest).
    3. Filters out anything in exclude_tweet_ids (already-seen posts).
    4. .take(MAX_TINY_POSTS_PER_USER_SCAN) — cap the scan length. This is critical: if user X follows a megaposter with 50k posts in retention, we don't want to scan all of them. The cap (say, ~50 most recent) ensures bounded work per author.
                // Perform light doc lookup to get full LightPost data. This will also filter deleted posts
                // Note: We copy the value immediately to release the read lock and avoid potential
                // deadlock when acquiring nested read locks while a writer is waiting.
                let light_post_iter_1 = tiny_posts_iter
                    .filter_map(|tiny_post| self.posts.get(&tiny_post.post_id).map(|r| *r.value()));

Now hydrate each TinyPost into a full LightPost:

  • self.posts.get(&tiny_post.post_id) returns Option<Ref<i64, LightPost>>.
  • .map(|r| *r.value()) dereferences the Ref and clones-via-deref the LightPost. The *r.value() works because LightPost must be Copy (or at least the deref must yield an owned-clonable value — actually wait).

Hmm — *r.value() requires LightPost: Copy. Proto-generated structs from prost aren't normally Copy. So either LightPost is small enough to derive Copy, or this is actually r.value().clone() and the source has been simplified.

Either way: the comment is critical — "we copy the value immediately to release the read lock." DashMap's Ref holds a read lock on the segment. Holding it across an if let Some(replied_to_post) = self.posts.get(&reply_to_post_id) (further down) would acquire another read lock, which can deadlock with a concurrent writer waiting for write access (because the DashMap fairness implementation may make the second reader wait behind the writer to avoid starvation). So we copy out and drop the first Ref before doing nested lookups.

filter_map drops any tiny_post whose ID doesn't have an entry in self.posts. That can happen if the post was deleted (removed from posts) but still in the deque (the deque isn't compacted on delete).

                let light_post_iter = light_post_iter_1.filter(|post| {
                    if self.deleted_posts.get(&post.post_id).is_some() {
                        POST_STORE_DELETED_POSTS_FILTERED.inc();
                        false
                    } else {
                        true
                    }
                });

Belt-and-suspenders filter: even if the post is in posts, also check deleted_posts and drop if tombstoned. The filter_map above should have already excluded it (since we delete from posts on tombstone), but if there's a race where the post was in posts and got tombstoned in between, this catches it. Increment a "deleted but slipped through" counter so we can monitor this code path.

                let light_post_iter = light_post_iter.filter(|post| {
                    !(post.is_retweet && post.source_user_id == Some(request_user_id))
                });

Filter out retweets where the original was authored by the requester. I.e. don't show user U a retweet of U's own post. Subtle UX rule.

post.source_user_id == Some(request_user_id) — comparing Option<i64> against Some(i64). Standard idiom.

                let filtered_post_iter = light_post_iter.filter(|post| {
                    if following_users.is_empty() {
                        return true;
                    }
                    post.in_reply_to_post_id.is_none_or(|reply_to_post_id| {
                        if let Some(replied_to_post) = self.posts.get(&reply_to_post_id) {
                            if !replied_to_post.is_retweet && !replied_to_post.is_reply {
                                return true;
                            }

                            return post.conversation_id.is_some_and(|convo_id| {
                                let reply_to_reply_to_original =
                                    replied_to_post.in_reply_to_post_id == Some(convo_id);
                                let reply_to_followed_user = post
                                    .in_reply_to_user_id
                                    .map(|uid| following_users.contains(&uid))
                                    .unwrap_or(false);

                                reply_to_reply_to_original && reply_to_followed_user
                            });
                        }

                        false
                    })
                });

The reply-chain-visibility filter — only applied when following_users is non-empty (i.e. only during the secondary-posts pass). The rule:

  • If following_users.is_empty(): keep everything (we're in the original-posts pass; no chain logic).
  • Otherwise, this is a reply or retweet. We need to decide whether to show it based on conversation context.
post.in_reply_to_post_id.is_none_or(|reply_to_post_id| { ... })

Option::is_none_or(f) returns true if self is None, or f returns true for the inner value. So:

  • If in_reply_to_post_id is None, the post is a retweet (not a reply) — keep it.
  • If it has a parent post, run the inner closure.

The inner closure: look up the post being replied to (replied_to_post). If we can't find it, return false (drop).

If we can find it:

  • Case A: the replied-to post is itself an original (not a retweet, not a reply). I.e. the user we follow wrote a top-level post, and the candidate is a reply to that post. Keep. We're showing replies to original posts our followee wrote.
  • Case B: the replied-to post is itself a reply (chain of depth 2+). Need extra checks:
    • reply_to_reply_to_original: the replied-to post's reply-to is the conversation root (conversation_id). I.e. the candidate is reply-to-reply-to-original.
    • reply_to_followed_user: the candidate's in_reply_to_user_id is someone we follow.
    • Keep only if both are true.

So we keep:

  • All retweets (no in_reply_to_post_id).
  • Replies to originals authored by anyone (since we wouldn't see the parent otherwise — wait, but the filter just requires replied_to_post.is_original — it doesn't check who wrote it. So actually replies to anyone's original posts pass through, as long as we found the parent post in the store).
  • Replies to replies where: the grand-parent is the conversation root AND the immediate reply is to someone we follow.

In short: chains are pruned to depth 2, and only chains where the second-level reply is to a followed user. This is the "feed surfaces interesting reply threads from people you follow" heuristic.

                light_posts.extend(filtered_post_iter.take(max_per_user));
            }
        }

Materialize: take(max_per_user) caps to MAX_*_POSTS_PER_AUTHOR, then extend appends the resulting iterator into light_posts.

This is where the iterator chain finally runs — all the filter calls above are lazy.

End of the per-user loop.

        // Track ratio of returned posts to eligible posts
        if total_eligible > 0 {
            let ratio = light_posts.len() as f64 / total_eligible as f64;
            POST_STORE_POSTS_RETURNED_RATIO.observe(ratio);
        }

        light_posts
    }

After the loop, observe a "returned / eligible" ratio metric. Useful for monitoring filter aggressiveness: if the ratio is too low, filters are doing too much work; if too high, the deques are too small.

Return the vec.

    /// Start a background task that periodically logs PostStore statistics
    pub fn start_stats_logger(self: Arc<Self>) {
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(5));

            loop {
                interval.tick().await;

start_stats_logger takes self: Arc<Self> — an Arc consumed by value. This is the "fluent Arc method" pattern. Callers do Arc::clone(&store).start_stats_logger();. The Arc moves into the spawned task, which keeps the store alive as long as the task runs.

tokio::time::interval(Duration::from_secs(5)) creates an Interval that fires every 5s. tick().await resolves at the next interval boundary (catching up if we're behind). Standard periodic-task pattern.

                let user_count = self.original_posts_by_user.len();
                let total_posts = self.posts.len();
                let deleted_posts = self.deleted_posts.len();

                // Sum up all VecDeque sizes for each map
                let original_posts_count: usize = self
                    .original_posts_by_user
                    .iter()
                    .map(|entry| entry.value().len())
                    .sum();
                let secondary_posts_count: usize = self
                    .secondary_posts_by_user
                    .iter()
                    .map(|entry| entry.value().len())
                    .sum();
                let video_posts_count: usize = self
                    .video_posts_by_user
                    .iter()
                    .map(|entry| entry.value().len())
                    .sum();

Compute statistics:

  • user_count: number of unique authors in the original-posts map.
  • total_posts: number of posts in the main posts map.
  • deleted_posts: number of tombstones.
  • The three per-type sums: walk each map and sum the lengths of every deque. O(authors) per scan.
                // Update Prometheus gauges
                POST_STORE_USER_COUNT.set(user_count as f64);
                POST_STORE_TOTAL_POSTS.set(total_posts as f64);
                POST_STORE_DELETED_POSTS.set(deleted_posts as f64);

                // Update entity count gauge with labels
                POST_STORE_ENTITY_COUNT
                    .with_label_values(&["users"])
                    .set(user_count as f64);
                POST_STORE_ENTITY_COUNT
                    .with_label_values(&["posts"])
                    .set(total_posts as f64);
                POST_STORE_ENTITY_COUNT
                    .with_label_values(&["original_posts"])
                    .set(original_posts_count as f64);
                POST_STORE_ENTITY_COUNT
                    .with_label_values(&["secondary_posts"])
                    .set(secondary_posts_count as f64);
                POST_STORE_ENTITY_COUNT
                    .with_label_values(&["video_posts"])
                    .set(video_posts_count as f64);
                POST_STORE_ENTITY_COUNT
                    .with_label_values(&["deleted_posts"])
                    .set(deleted_posts as f64);

Write to Prometheus. Both as flat gauges (POST_STORE_USER_COUNT) and as a single labeled gauge (POST_STORE_ENTITY_COUNT with a label "users", "posts", etc.). The labeled form is more flexible for dashboards (one query templated over labels); the flat form is for backwards compatibility / per-metric queries.

                info!(
                    "PostStore Stats: {} users, {} total posts, {} deleted posts",
                    user_count, total_posts, deleted_posts
                );
            }
        });
    }

Also log a human-readable line. Loop forever.

End of start_stats_logger.

    /// Start a background task that periodically trims old posts
    pub fn start_auto_trim(self: Arc<Self>, interval_minutes: u64) {
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60));

            loop {
                interval.tick().await;
                let trimmed = self.trim_old_posts().await;
                if trimmed > 0 {
                    info!("Auto-trim: removed {} old posts", trimmed);
                }
            }
        });
    }

start_auto_trim is analogous: every interval_minutes, call trim_old_posts(). Log only if anything was trimmed (avoid log spam during idle hours).

    /// Manually trim posts older than retention period from all users
    /// Returns the number of posts trimmed
    pub async fn trim_old_posts(&self) -> usize {
        let posts_map = Arc::clone(&self.posts);
        let original_posts_by_user = Arc::clone(&self.original_posts_by_user);
        let secondary_posts_by_user = Arc::clone(&self.secondary_posts_by_user);
        let video_posts_by_user = Arc::clone(&self.video_posts_by_user);
        let deleted_posts = Arc::clone(&self.deleted_posts);
        let retention_seconds = self.retention_seconds;

The actual trim logic. Clone the Arcs into local variables that can be moved into a spawn_blocking task.

        tokio::task::spawn_blocking(move || {
            let current_time = std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_secs();

            let mut total_trimmed = 0;

Trimming touches a lot of map entries — it's CPU-bound work. tokio::task::spawn_blocking runs it on the blocking thread pool (default ~512 threads) instead of the async one. This prevents a long trim from starving feed RPCs.

Get current time as unix seconds. Note .unwrap() — assumes clock is sane. Could panic if the clock is before UNIX_EPOCH, but that's a "this machine is broken" scenario.

            // Helper closure to trim posts from a given map
            let trim_map = |posts_by_user: &DashMap<i64, VecDeque<TinyPost>>,
                            posts_map: &DashMap<i64, LightPost>,
                            deleted_posts: &DashMap<i64, bool>|
             -> usize {
                let mut trimmed = 0;
                let mut users_to_remove = Vec::new();

A local closure trim_map parameterized on which map to trim. Returns the number of posts removed.

We track users_to_remove separately so we can drop empty deques afterwards (we can't remove from a DashMap while iterating it — would deadlock).

                for mut entry in posts_by_user.iter_mut() {
                    let user_id = *entry.key();
                    let user_posts = entry.value_mut();

                    while let Some(oldest_post) = user_posts.front() {
                        if current_time - (oldest_post.created_at as u64) > retention_seconds {
                            let trimmed_post = user_posts.pop_front().unwrap();
                            posts_map.remove(&trimmed_post.post_id);

                            if user_id == DELETE_EVENT_KEY {
                                deleted_posts.remove(&trimmed_post.post_id);
                            }
                            trimmed += 1;
                        } else {
                            break;
                        }
                    }

For each (user_id, VecDeque) entry:

  • iter_mut() gives mutable refs (locks per-segment).
  • Inner while: peek front() (oldest end). If past retention, pop_front() (return the value), then:
    • Remove the post from the main posts map.
    • If this is the special tombstone deque (user_id == DELETE_EVENT_KEY), also remove from deleted_posts — these are delete tombstones that have now aged out, no point keeping them.
  • Stop the inner loop as soon as we hit a post that's young enough. Because the deque is sorted by created_at (oldest-first), everything after is also young enough.

This per-deque trim is O(trimmed posts). Across all deques it's O(total trimmed). No useless scanning of young posts.

oldest_post.created_at as u64created_at is i64. Casting negative numbers to u64 would wrap, but that's fine here since current_time is around 2^32 and any negative created_at would underflow current_time - … to a giant value, also greater than retention, so the post gets trimmed. Correct by accident, in a sense — though if created_at == 0, current_time - 0 = current_time which is way greater than retention, so it gets trimmed. Reasonable.

                    if user_posts.capacity() > user_posts.len() * 2 {
                        let new_cap = user_posts.len() as f32 * 1.5_f32;
                        user_posts.shrink_to(new_cap as usize);
                    }

                    if user_posts.is_empty() {
                        users_to_remove.push(user_id);
                    }
                }

After trimming a single user's deque:

  • If the deque's capacity is more than 2x its length, shrink to 1.5 * len. This avoids memory bloat: VecDeque doubles capacity on growth but doesn't auto-shrink. Without this, a once-active user who now posts rarely would keep a huge backing buffer forever.
  • If empty after trim, remember to drop the entry.
                for user_id in users_to_remove {
                    posts_by_user.remove_if(&user_id, |_, posts| posts.is_empty());
                }

                trimmed
            };

After the iter loop, drop empty entries. remove_if is the conditional remove — re-check the entry is still empty (a concurrent writer might have inserted while we were trimming other users). This is the standard CAS-like dance for concurrent maps.

End of the closure.

            total_trimmed += trim_map(&original_posts_by_user, &posts_map, &deleted_posts);
            total_trimmed += trim_map(&secondary_posts_by_user, &posts_map, &deleted_posts);
            trim_map(&video_posts_by_user, &posts_map, &deleted_posts);

            total_trimmed
        })
        .await
        .expect("spawn_blocking failed")
    }

Run the closure on all three timeline maps. Sum the counts from original and secondary (the video map's trim count is dropped — its posts overlap with the other two, so we'd double-count if we added).

.await.expect("spawn_blocking failed") unwraps the JoinError. A spawn_blocking task only fails if the runtime is shutting down — we panic loudly if so.

Note: only original and secondary removals are counted, but video_posts_by_user is also trimmed. The video deque points to TinyPosts that may also live in original_posts_by_user, so its trimming removes references but posts_map.remove(...) may have already removed the actual post — that's fine, removing a key that's already gone is a no-op for DashMap.

    /// Sorts all user post lists by creation time (newest first)
    pub async fn sort_all_user_posts(&self) {
        let original_posts_by_user = Arc::clone(&self.original_posts_by_user);
        let secondary_posts_by_user = Arc::clone(&self.secondary_posts_by_user);
        let video_posts_by_user = Arc::clone(&self.video_posts_by_user);

        tokio::task::spawn_blocking(move || {
            // Sort original posts
            for mut entry in original_posts_by_user.iter_mut() {
                let user_posts = entry.value_mut();
                user_posts
                    .make_contiguous()
                    .sort_unstable_by_key(|a| a.created_at);
            }
            // Sort secondary posts
            for mut entry in secondary_posts_by_user.iter_mut() {
                let user_posts = entry.value_mut();
                user_posts
                    .make_contiguous()
                    .sort_unstable_by_key(|a| a.created_at);
            }
            // Sort video posts
            for mut entry in video_posts_by_user.iter_mut() {
                let user_posts = entry.value_mut();
                user_posts
                    .make_contiguous()
                    .sort_unstable_by_key(|a| a.created_at);
            }
        })
        .await
        .expect("spawn_blocking failed");
    }

sort_all_user_posts re-sorts every deque ascending by created_at. Called once during finalize_init.

The doc comment says "newest first" but the code is sort_unstable_by_key(|a| a.created_at) which sorts ascending (oldest first, newest last). The doc is misleading — but the callers read from the back (via .rev()) so the effective access order is newest-first.

make_contiguous() is a VecDeque method that lays out the ring buffer as a single contiguous slice (rearranging if needed), then returns &mut [T] for sorting. Without this, you'd have to .iter_mut().sorted() etc. — slower and more allocation.

Again spawn_blocking to avoid blocking the async runtime.

    /// Clears all posts from the store
    pub fn clear(&self) {
        self.posts.clear();
        self.original_posts_by_user.clear();
        self.secondary_posts_by_user.clear();
        self.video_posts_by_user.clear();
        info!("PostStore cleared");
    }
}

clear() wipes everything. Note it doesn't clear deleted_posts — those tombstones survive a clear, presumably because they're important across replays. Used by tests / admin tools.

impl Default for PostStore {
    fn default() -> Self {
        // Default to 2 days retention, no timeout
        Self::new(2 * 24 * 60 * 60, 0)
    }
}

Default::default — 2 days retention, no timeout. Inferred default for tests / fallback usage.

End of file.


What we've learned

Thunder's post store is a textbook example of an in-memory shard with concurrent access. Things to take away:

Memory layout: one big DashMap<i64, LightPost> indexed by post_id + three DashMap<i64, VecDeque<TinyPost>> indexed by author_id. The TinyPost-vs-LightPost separation keeps the per-user indexes ~50x smaller and lets posts appear in multiple lists without duplication.

Why three per-author maps: querying "what video posts did this author make" is the most efficient if you maintain a video-only index. Same for original vs. secondary. The cost is write-time work; the benefit is read-time precision.

Tombstones via a sentinel user: deleted posts go into deleted_posts (a hashset-of-IDs) for fast lookup, and also into the DELETE_EVENT_KEY entry of original_posts_by_user so the retention-trim logic can drop old tombstones too. Clever piggybacking — no separate "tombstone trimming" path.

Reply-chain visibility logic: replies are kept only if they're either (a) replies to original posts, or (b) replies-to-replies where the chain root is the original and the immediate parent is by a followed user. Depth-bounded, with a who-do-you-follow predicate. This is a meaningful business rule baked into a leaf store — the rest of the system can't make this decision (it needs the full reply graph).

Concurrency model: DashMap segments give per-bucket locking. To avoid deadlocks, the code follows a strict rule: never hold a Ref across another DashMap lookup. Hence the explicit "copy the value out" comment in get_posts_from_map.

Trim strategy: timestamp-sorted deques mean trimming is a simple pop_front loop bounded by the number of actually-old posts. Plus a shrink_to(1.5 * len) to release backing memory after large trims.

spawn_blocking for heavy CPU work: sort and trim run on the blocking pool, not the async pool. Trim is bounded but iterates millions of entries; sort runs n log n per author. Either could starve the async runtime if left there.

Open-source gap: many things are referenced but not provided in the open release — args::Args, config::*, metrics::*, schema::*, strato_client. The shape is recoverable from usage; the implementations are X-internal.

Likely source-edit artifact: insert_posts_internal reads post.has_video, post.is_retweet, etc. after let old = self.posts.insert(post_id, post); consumes post. That code as-shown wouldn't compile against a non-Copy proto type. Either LightPost is Copy (small struct), or the published source dropped a .clone(). Worth noting if anyone tries to build this verbatim.


Next session

Session 03 — Thunder II: the Kafka listener loops (kafka/tweet_events_listener.rs and tweet_events_listener_v2.rs and kafka/mod.rs and kafka/utils.rs) plus the gRPC service implementation (thunder_service.rs). ~1,132 LOC. We'll see how Kafka events flow into the store and how the gRPC handler builds responses for home-mixer.