X For You algorithm, line by line · Part 22

X For You algorithm, line by line — Part 22 (finale): Grox publishing layer + series wrap-up

The final part. The publishing layer that writes everything to Manhattan and Kafka: V2/V3/V4/V5 embedder tasks, embedding Kafka publishers, the big task_pub.py kitchen sink, the five embedding sink variants, and the safety annotations sink with bool-metadata derivation and safemodel defense-in-depth. Plus a complete series wrap-up: 22 sessions, 24,914 LOC, Rust + Python, from candidate-pipeline to home-mixer to Phoenix to Grok.

May 15, 2026·30 min read

The final session of the 22-part walkthrough. Five files, 1,255 LOC — the publishing layer that takes everything the upstream tasks computed (classifications, embeddings, safety annotations) and writes them out to the rest of the X system: Kafka topics, Manhattan KV columns, and Strato moderation actions.

The two big architectural beats:

  1. Embedding pipeline writes: a post traverses the embedder (V2/V3/V4/V5) and lands in ctx.multimodal_post_embedding_dict. Sink tasks pick it up by version key and write to Strato + Kafka.
  2. Safety pipeline writes: PTOS category + policy detection produces a SafetyPostAnnotations object on the context; the sink task derives bool metadata from violations, merges with existing state, calls action-application Strato endpoints, and writes the merged result back.

Files:

File LOC Role
task_multimodal_post_embedding.py 86 Run V2/V3/V4/V5 embedders, stash in ctx.multimodal_post_embedding_dict
task_embedding_pub.py 79 Kafka publish helpers for V4 and V5 embeddings
task_pub.py 555 The big publisher kitchen sink: content-analysis Kafka, UPA Manhattan, bool-metadata upsert, reply-ranking sinks, reply-spam sinks
task_write_mm_embedding_sink.py 193 The five embedding-sink variants (Experiment/V3/V4/V5/V5SkipKafkaForReplies)
task_write_safety_post_annotations_result_sink.py 342 The complex safety sink: bool-metadata derivation, safemodel fallback, action labeling

task_multimodal_post_embedding.py (86 lines)

Three tasks, one per embedding generation (V2/V3 share infrastructure, V4 is its own, V5 is its own).

TaskMultimodalPostEmbeddingWithSummary — the "V3" generation

class TaskMultimodalPostEmbeddingWithSummary(TaskWithPost):
    embedder = MultimodalPostEmbedderV2(
        model="qwen3", renderer_version="lite", use_post_context_summary=True
    )

    @override
    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        _, embedding = await cls.embedder.embed(post)
        ctx.multimodal_post_embedding_dict["v3"] = embedding
        logger.info(
            f"TaskMultimodalPostEmbeddingWithSummary Embedding Added, length: {len(embedding)}"
        )
        Metrics.counter("task.multimodal_post_embedding_with_summary.count").add(1)

Class-level MultimodalPostEmbedderV2 instance (Session 20). Configured for:

  • model="qwen3" — Qwen3 0.6B as the underlying embedding model.
  • renderer_version="lite" — minimal post rendering (no fancy formatting).
  • use_post_context_summary=True — append the post's pre-computed summary to the input.

This task is the V3 stage in the embedding pipeline. The plan order is: task_post_embedding_summarizer → this → task_write_post_embedding_sink_v3. The summarizer writes post.summary; this task reads it (via use_post_context_summary) and emits the embedding under the "v3" key.

ctx.multimodal_post_embedding_dict["v3"] = embedding — keyed-storage on the context. We store by version because a single task can produce multiple embedding versions that downstream sinks pick by key. (This particular task only writes "v3", but the dict approach generalizes.)

TaskMultimodalPostEmbeddingRecsysV4 — the production rec-sys version

class TaskMultimodalPostEmbeddingRecsysV4(TaskWithPost):
    embedder = MultimodalPostEmbedderV2(
        model="v4",
        renderer_version="lite",
        use_post_context_summary=True,
    )

    @override
    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        try:
            _, embedding = await cls.embedder.embed(post)
        except Exception as e:
            Metrics.counter("task.multimodal_post_embedding_recsys_v4.error").add(1)
            logger.warning(
                f"TaskMultimodalPostEmbeddingRecsysV4 failed for post {post.id}: {e}"
            )
            return
        ctx.multimodal_post_embedding_dict["v4"] = embedding
        logger.info(
            f"TaskMultimodalPostEmbeddingRecsysV4 Embedding Added, length: {len(embedding)}"
        )
        Metrics.counter("task.multimodal_post_embedding_recsys_v4.count").add(1)

Same shape but model="v4". The V4 model is the rec-sys-trained embedder we walked through in Session 16 — the very same model Phoenix's retrieval tower consumes.

Notable: the V4 task swallows exceptions — wraps the embedder call in try/except, logs and returns on failure. Why? V4 is in active development; we don't want a V4 model failure to break the whole plan (which also writes V3 embeddings). Plus, the V4 endpoint may be intermittently down during model rollouts.

V3 doesn't have this guard — V3 is the stable production fallback. If V3 fails, the plan should fail too.

TaskMultimodalPostEmbeddingV5 — the multimodal-with-ASR generation

class TaskMultimodalPostEmbeddingV5(TaskWithPost):
    embedder = MultimodalPostEmbedderV5()

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        try:
            transcripts = []
            if post.media:
                for m in post.media:
                    if (
                        isinstance(m, Video)
                        and m.convo_video
                        and m.convo_video.asr_transcript
                    ):
                        transcripts.append(m.convo_video.asr_transcript)
            transcript = "\n".join(transcripts) if transcripts else None
            _, embedding = await cls.embedder.embed(post, transcript=transcript)
        except Exception as e:
            Metrics.counter("task.multimodal_post_embedding_v5.error").add(1)
            logger.warning(
                f"TaskMultimodalPostEmbeddingV5 failed for post {post.id}: {e}"
            )
            raise
        ctx.multimodal_post_embedding_dict["v5_1"] = embedding
        logger.info(
            f"TaskMultimodalPostEmbeddingV5 Embedding Added, length: {len(embedding)}, has_transcript={transcript is not None}"
        )
        Metrics.counter("task.multimodal_post_embedding_v5.count").add(1)
        if transcript:
            Metrics.counter(
                "task.multimodal_post_embedding_v5.with_transcript.count"
            ).add(1)

Walks the post's media list, collects all video ASR transcripts (set by the earlier TaskASRTranscription from Session 21), joins them with newlines, and passes to the V5 embedder.

ctx.multimodal_post_embedding_dict["v5_1"] — note the "v5_1" key, suggesting V5 has sub-versions (v5_1, v5_2, ...) for A/B-tested variants of the V5 model itself. Storage key is decoupled from the model parameter, which lets you swap V5 model variants without changing the storage schema.

V5 task re-raises the exception after counting (unlike V4 which swallows). V5 is now the primary embedding pipeline; if it fails, the plan should know.

The has_transcript log line records whether ASR was usable for this post — observability into ASR coverage.


task_embedding_pub.py (79 lines)

Kafka producers for embedding-only topics. These are used both as standalone publish tasks and as internal helpers from the sink tasks.

class TaskPublishEmbeddingKafka(TaskWithPost):
    DISABLE_RULES = [DisableTaskForNonMmEmbProd]
    KAFKA_TOPIC_NAME: KafkaTopicName

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        embedding = ctx.multimodal_post_embedding
        if embedding is None:
            Metrics.counter("task.publish_embedding_kafka.skipped.count").add(
                1, attributes={"reason": "no_embedding"}
            )
            logger.info(f"No embedding available for post {post.id}, skipping")
            return

        Metrics.counter("task.publish_embedding_kafka.intaken.count").add(1)
        try:
            await cls._publish_to_kafka(post, embedding)
            Metrics.counter("task.publish_embedding_kafka.success.count").add(1)
            if post.created_at:
                latency = time.time() - post.created_at.timestamp()
                Metrics.histogram("task.publish_embedding_kafka.e2e_latency").record(
                    latency
                )
        except Exception:
            Metrics.counter("task.publish_embedding_kafka.failed.count").add(1)
            logger.error(
                f"Failed to publish embedding to Kafka: {traceback.format_exc()}"
            )
            raise

Abstract base — subclasses set KAFKA_TOPIC_NAME. DISABLE_RULES = [DisableTaskForNonMmEmbProd] — only runs in the mm-emb production cluster.

Interesting: this reads ctx.multimodal_post_embedding (singular), not the dict. That field on the context (Session 18 — TaskContext.multimodal_post_embedding) is the "selected primary" embedding, while multimodal_post_embedding_dict holds the version-keyed map. Some plan must do the selection step.

e2e_latency is post creation → publish, computed from post.created_at. This is the most important Grox metric — measures end-to-end pipeline latency from when a tweet was posted to when its embedding was indexed.

    @classmethod
    async def _publish_to_kafka(cls, post: Post, embedding: list[float]) -> None:
        tweet_embedding = SimpleTweetEmbedding(
            tweetId=int(post.id),
            embedding1=embedding,
        )
        serialized_bytes = Serializer.serialize(tweet_embedding)
        await cls._get_kafka_producer().send(id=post.id, value=serialized_bytes)
        logger.info(
            f"Published embedding for post {post.id} to {cls.KAFKA_TOPIC_NAME.value}"
        )

    @classmethod
    @cache
    def _get_kafka_producer(cls) -> ScramKafkaProducer:
        producer_config = grox_config.get_kafka_producer_topic(cls.KAFKA_TOPIC_NAME)
        logger.info(
            f"Creating embedding kafka producer with config: {producer_config.model_dump()}"
        )
        return ScramKafkaProducer(producer_config)


class TaskPublishEmbeddingV4Kafka(TaskPublishEmbeddingKafka):
    KAFKA_TOPIC_NAME = KafkaTopicName.GROX_MULTIMODAL_EMBEDDING_V4


class TaskPublishEmbeddingV5Kafka(TaskPublishEmbeddingKafka):
    KAFKA_TOPIC_NAME = KafkaTopicName.GROX_MULTIMODAL_EMBEDDING_V5

SimpleTweetEmbedding is the Thrift schema for an embedding event (just tweet ID + vector). Serialized via the project's Thrift serdes, sent to Kafka with id=post.id as the partition key (ensures all events for the same post go to the same partition for ordering).

@cache on _get_kafka_producer — one producer per class (i.e., one per topic). Producers are expensive; reuse.

ScramKafkaProducer — SCRAM is the SASL authentication mechanism (a typical Kafka prod setup).

Two concrete subclasses: V4 → GROX_MULTIMODAL_EMBEDDING_V4, V5 → GROX_MULTIMODAL_EMBEDDING_V5. Used both as plan tasks AND as helpers — task_write_mm_embedding_sink.py reaches in and calls ._publish_to_kafka directly to do "Manhattan + Kafka" in a single task.


task_pub.py (555 lines) — the big publisher

The largest task file. Five public classes: TaskPublishKafka, TaskPublishUnifiedPostAnnotationsManhattan, TaskUpsertTweetBoolMetadataToUnifiedPostAnnotation, TaskWriteReplyRankingManhattan, TaskWriteReplySpamManhattan.

TaskPublishKafka — the unified content-analysis publisher

class TaskPublishKafka(Task):
    DISABLE_RULES = [DisableTaskForLocal, DisableTaskForDev]

    @classmethod
    async def _exec(cls, ctx: TaskContext) -> None:
        post = ctx.payload.post
        results = ctx.content_categories
        if not post:
            return

        if not results:
            Metrics.counter("task.publish_kafka.skipped.count").add(
                1, attributes={"reason": "no_results"}
            )
            return

Disabled in local and dev (but enabled in non-prod-cluster prod modes — note the difference from DisableTaskForNonProd). Why? Multi-cluster Grox has a "staging-on-prod-infra" cluster where you want Kafka writes for testing, but not real production traffic.

Skip if no post or no results in ctx.content_categories. Most plans append at least one result during execution; if we got here with none, something earlier skipped silently.

        Metrics.counter("task.publish_kafka.intaken.count").add(1)
        try:
            await cls._publish_to_kafka(
                post.id,
                post.user.id if post.user else None,
                results,
                summary=ctx.summary,
                embedding=ctx.multimodal_post_embedding,
            )
            for result in results:
                Metrics.counter("task.publish_kafka.success.count").add(
                    1, attributes={"category": result.category.value}
                )
            if post.created_at:
                latency = time.time() - post.created_at.timestamp()
                for res in results:
                    Metrics.histogram("task.classification_e2e_latency").record(
                        latency, attributes={"category": res.category.value}
                    )
            else:
                Metrics.counter("task.publish_kafka.post_no_created_at.count").add(1)
        except Exception:
            Metrics.counter("task.publish_kafka.failed.count").add(1)
            logger.error(
                f"Failed to publish classification record: {traceback.format_exc()}"
            )
            raise

Pass post ID, user ID (optional), results list, summary (might be empty), embedding (might be None). Record per-category success counters. Record per-category e2e latency histograms.

Note for result in results increments the success counter per result — if one Kafka message carries 3 results, we count 3 successes. The counter measures "category-results published", not "Kafka messages sent."

    @classmethod
    async def _publish_to_kafka(
        cls,
        post_id: str,
        user_id: int | None,
        results: list[ContentCategoryResult],
        summary: str,
        embedding: list[float] | None,
    ):
        category_results = [
            t.CategoryResult(
                category=r.category.name,
                positive=r.positive,
                score=r.score,
                summary=r.summary,
                taxonomyCategories=[
                    t.TaxonomyCategoryScore(id=tc.id, name=tc.name, score=tc.score)
                    for tc in r.taxonomy_categories
                ]
                if r.taxonomy_categories
                else None,
                keywords=None,
            )
            for r in results
        ]
        grox_content_analysis = t.GroxContentAnalysis(
            postId=int(post_id),
            userId=user_id,
            categoryResults=category_results,
            summary=summary,
            createdAt=int(time.time()),
        )
        serialized_bytes = Serializer.serialize(grox_content_analysis)
        await cls._get_kafka_producer().send(id=post_id, value=serialized_bytes)

Convert in-memory Pydantic ContentCategoryResult to Thrift CategoryResult. Build a GroxContentAnalysis envelope (a single Thrift struct containing all the categories for this post). Serialize, send.

keywords=None — there's a keywords field in the Thrift schema that's currently unused. Reserved for future use, kept null for now.

    @classmethod
    @cache
    def _get_kafka_producer(cls):
        producer_config = grox_config.get_kafka_producer_topic(
            KafkaTopicName.GROX_CONTENT_ANALYSIS
        )
        logger.info(
            f"Creating kafka producer with config: {producer_config.model_dump()}"
        )
        return KafkaProducer(producer_config)

Single producer for GROX_CONTENT_ANALYSIS topic, cached. Note KafkaProducer (not ScramKafkaProducer like the embedding ones). This topic probably uses a different auth model (PLAINTEXT for internal-only?).

TaskPublishUnifiedPostAnnotationsManhattan — the rich Manhattan write

The largest single method in this file. Writes the UnifiedPostAnnotations row to Manhattan with extensive metric breakdown.

class TaskPublishUnifiedPostAnnotationsManhattan(Task):
    DISABLE_RULES = [DisableTaskForNonProd]

    @classmethod
    async def _exec(cls, ctx: TaskContext) -> None:
        Metrics.counter("task.publish_unified_post_annotations.count").add(1)
        results = ctx.content_categories
        if not results:
            logger.info("No unified post annotations to publish")
            return

        post = ctx.payload.post
        if not post:
            return

        grok_response = next(
            (
                r
                for r in results
                if r.category == ContentCategoryType.BANGER_INITIAL_SCREEN
            ),
            None,
        )
        if not grok_response:
            return

Disabled outside prod. Find the BANGER_INITIAL_SCREEN result — that's the source of UPA data. The banger classifier produces all the rich fields (description, tags, taxonomy, tweet_bool_metadata, etc.).

        if grok_response.slop_score is not None:
            if grok_response.slop_score == 1:
                Metrics.counter(
                    "task.publish_unified_post_annotations.slop_score_1.count"
                ).add(1)
            elif grok_response.slop_score == 2:
                Metrics.counter(
                    "task.publish_unified_post_annotations.slop_score_2.count"
                ).add(1)
            elif grok_response.slop_score == 3:
                Metrics.counter(
                    "task.publish_unified_post_annotations.slop_score_3.count"
                ).add(1)

Per-slop-score-value counters (1/2/3 are buckets — 1 = low slop, 3 = high slop). Using one counter name per bucket (rather than one counter with a level attribute) — same trade-off we saw in task_safety_ptos_policy.py: more metric names, smaller cardinality per metric.

        if grok_response.tweet_bool_metadata:
            if grok_response.tweet_bool_metadata.isHighQuality:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_high_quality_true.count"
                ).add(1)
            if grok_response.tweet_bool_metadata.isNsfw:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_nsfw_true.count"
                ).add(1)
                record_nsfw_detection(post)
            if grok_response.tweet_bool_metadata.isGore:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_gore_true.count"
                ).add(1)
            if grok_response.tweet_bool_metadata.isViolent:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_violent_true.count"
                ).add(1)
            if grok_response.tweet_bool_metadata.isSpam:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_spam_true.count"
                ).add(1)
            if grok_response.tweet_bool_metadata.isSoftNsfw:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_soft_nsfw_true.count"
                ).add(1)
            if grok_response.tweet_bool_metadata.isAdult:
                Metrics.counter(
                    "task.publish_unified_post_annotations.is_adult_true.count"
                ).add(1)

Per-bool-flag counters. Note record_nsfw_detection(post) — function not visible in our scope; presumably emits to a separate stats system or fires a side-effect log.

The complete bool set: isHighQuality, isNsfw, isGore, isViolent, isSpam, isSoftNsfw, isAdult. The output schema of the banger classifier — these are exactly the booleans the LLM is prompted to produce.

        if grok_response.tags and len(grok_response.tags) > 0:
            Metrics.counter(
                "task.publish_unified_post_annotations.tags_non_empty.count"
            ).add(1)

        if grok_response.is_image_editable_by_grok:
            Metrics.counter(
                "task.publish_unified_post_annotations.is_image_editable_by_grok_true.count"
            ).add(1)

        if post.media:
            if any(isinstance(m, Video) for m in post.media):
                Metrics.counter(
                    "task.publish_unified_post_annotations.has_video_true.count"
                ).add(1)
            if any(isinstance(m, Image) for m in post.media):
                Metrics.counter(
                    "task.publish_unified_post_annotations.has_image_true.count"
                ).add(1)

More observability counters: tags-non-empty, image-editable-by-grok, has-video, has-image. The has-video / has-image counters lets you cross-reference UPA categorization with media type ("how often does a video get tagged as NSFW vs an image?").

        resolved_grok_topics = []
        if grok_response.taxonomy_categories and ctx.available_topics:
            id_to_name = {}
            name_to_category_id = {}
            for category in ctx.available_topics:
                id_to_name[category.categoryEntityId] = category.categoryName
                name_to_category_id[category.categoryName] = category.categoryEntityId
                for sub in category.subtopics:
                    id_to_name[sub.topicEntityId] = sub.topicName
                    name_to_category_id[sub.topicName] = category.categoryEntityId

            topic_id_to_best_score = {}
            for grok_topic in grok_response.taxonomy_categories:
                topic_id = grok_topic.id
                if topic_id in id_to_name:
                    topic_name = id_to_name[topic_id]
                    category_id = name_to_category_id[topic_name]

                    resolved_grok_topic = ContentCategoryScore(
                        id=topic_id,
                        name=topic_name,
                        score=grok_topic.score,
                        category_id=category_id,
                    )
                    logger.info(
                        f"Validated grok_topic: ID {topic_id} -> '{topic_name}' (category_id: {category_id})"
                    )
                else:
                    logger.warning(
                        f"Invalid topic ID from Grok: {topic_id} not found in available topics"
                    )
                    Metrics.counter(
                        "task.publish_unified_post_annotations.invalid_grok_topic.count"
                    ).add(1)
                    continue

                if (
                    topic_id not in topic_id_to_best_score
                    or grok_topic.score > topic_id_to_best_score[topic_id].score
                ):
                    topic_id_to_best_score[topic_id] = resolved_grok_topic

            resolved_grok_topics = list(topic_id_to_best_score.values())
        elif grok_response.taxonomy_categories:
            logger.warning("No available topics to validate grok_topics")
            resolved_grok_topics = []

Topic ID validation + dedup:

  1. Build id_to_name (topic ID → name) and name_to_category_id (topic name → its parent category ID) maps from ctx.available_topics (the cached topic list from Session 21's TaskBangerScreen).
  2. For each topic the model output:
    • If the ID is in the map → resolve the name + parent category.
    • If not in the map → log warning, increment invalid_grok_topic.count. Skip it. (Model hallucinated a topic ID that doesn't exist.)
  3. Dedup by topic ID, keeping the highest score — the model might output the same topic twice with different scores; keep the best one.

The two-step name lookup (id_to_name[topic_id] then name_to_category_id[topic_name]) is unusual — why not have a direct topic_id → category_id map? Because the topics list structure uses both names and IDs as keys in different places. The double-hop just navigates that.

If we have topics but no available_topics cache, we silently fail (resolved_grok_topics = []). Probably because the topic cache might be stale during a startup race; better to write empty topics than to throw.

        for topic in resolved_grok_topics:
            sanitized_topic_name_for_metric = (
                topic.name.lower().replace(" ", "_").replace("&", "and")
            )
            Metrics.counter(
                f"task.publish_unified_post_annotations.topic_{sanitized_topic_name_for_metric}.count"
            ).add(1)

Per-topic counter — emits one counter name per topic. So you get metrics like task.publish_unified_post_annotations.topic_sports.count, ..._topic_politics.count, etc. This explodes the metric cardinality but gives instant per-topic dashboards.

Sanitization: lowercase, spaces→underscores, "&"→"and". Necessary because metric names typically don't allow these characters.

        entities = []
        if resolved_grok_topics and len(resolved_grok_topics) > 0:
            Metrics.counter(
                "task.publish_unified_post_annotations.with_grok_topics.count"
            ).add(1)
            entities = [
                EntityWithMetadata(
                    qualifiedId=QualifiedId(domainId=236, entityId=str(grok_topic.id)),
                    score=grok_topic.score,
                    categoryId=QualifiedId(
                        domainId=236, entityId=str(grok_topic.category_id)
                    )
                    if grok_topic.category_id
                    else None,
                )
                for grok_topic in resolved_grok_topics
            ]
        else:
            Metrics.counter(
                "task.publish_unified_post_annotations.with_empty_grok_topics.count"
            ).add(1)

Build the entities field of the UPA Thrift struct. QualifiedId(domainId=236, entityId=str(grok_topic.id)) — domain 236 is the "Grok topic" domain (X uses numeric domain IDs to namespace entity IDs). Score and parent category are attached.

        annotations = UnifiedPostAnnotations(
            tweetId=post.id,
            entities=entities,
            tags=[{"tag": tag, "score": 0.0} for tag in (grok_response.tags or [])],
            tweetBoolMetadata=grok_response.tweet_bool_metadata.model_dump()
            if grok_response.tweet_bool_metadata
            else None,
            description=grok_response.summary,
            isImageEditableByGrok=grok_response.is_image_editable_by_grok,
            slopScore=grok_response.slop_score,
            originalOcrText="",
            evergreenScore=None,
            hasVideo=post.media and any(isinstance(m, Video) for m in post.media),
            hasImage=post.media and any(isinstance(m, Image) for m in post.media),
            imageDescription=None,
            videoDescription=None,
            qualityScore=grok_response.score,
            hasMinorScore=grok_response.has_minor_score,
            hasCard=post.card is not None,
            foundMetadata=FoundMetadata(
                imageCount=sum(1 for m in post.media if isinstance(m, Image))
                if post.media
                else 0,
                videoCount=sum(1 for m in post.media if isinstance(m, Video))
                if post.media
                else 0,
                cardCount=1 if post.card else 0,
                cardV2Count=len(post.cardsV2) if post.cardsV2 else 0,
            ),
        )

        await StratoUnifiedPostAnnotations().put(int(post.id), annotations)
        Metrics.counter("task.publish_unified_post_annotations.success.count").add(1)

Build the final UPA record and put to Manhattan keyed by tweet ID.

Fields:

  • entities — the topic list.
  • tags — the free-form tags with score=0.0 (apparently no per-tag score yet; placeholder).
  • tweetBoolMetadata — dict-dumped Pydantic.
  • description — the banger's summary.
  • isImageEditableByGrok / slopScore / hasMinorScore / qualityScore — the rich banger fields.
  • originalOcrText / evergreenScore / imageDescription / videoDescription — set to defaults (probably populated by other pipelines).
  • hasVideo / hasImage / hasCard — media-presence booleans.
  • foundMetadata — per-media-type counts.

This is the primary content-analysis row that home-mixer's grok_* field reads come from. Every Post in Thunder is hydrated with this on read.

Some fields use Python's and short-circuit oddly: hasVideo=post.media and any(isinstance(m, Video) for m in post.media). If post.media is None, this evaluates to None (not False!). The Thrift schema probably allows None for optional booleans, but it's stylistically off.

TaskUpsertTweetBoolMetadataToUnifiedPostAnnotation — the safety-only updater

class TaskUpsertTweetBoolMetadataToUnifiedPostAnnotation(Task):
    DISABLE_RULES = [DisableTaskForNonProd]

    @classmethod
    async def _exec(cls, ctx: TaskContext) -> None:
        Metrics.counter(
            "task.upsert_tweet_bool_metadata_to_unified_post_annotations.count"
        ).add(1)
        results = ctx.content_categories
        if not results:
            logger.info("No unified post annotations to publish")
            return

        post = ctx.payload.post
        if not post:
            return

        grok_response = next(
            (
                r
                for r in results
                if r.category == ContentCategoryType.POST_SAFETY_SCREEN
            ),
            None,
        )
        if not grok_response or not grok_response.tweet_bool_metadata:
            return

The POST_SAFETY_SCREEN flavor — runs only when the post-safety classifier produced metadata. (PlanPostSafety in Session 19 ends with this task.)

        if grok_response.tweet_bool_metadata.isHighQuality:
            ...
        if grok_response.tweet_bool_metadata.isNsfw:
            ...
        # ... 7 per-bool counters identical to TaskPublishUnifiedPostAnnotationsManhattan ...

        await StratoUpsertTweetBoolMetadataToUnifiedPostAnnotations().put(
            int(post.id), grok_response.tweet_bool_metadata.model_dump()
        )
        Metrics.counter(
            "task.upsert_tweet_bool_metadata_to_unified_post_annotations.success.count"
        ).add(1)

Emit the same 7 boolean counters. Then upsert (not put) — this writes the bool metadata field of an existing UPA row without overwriting other fields.

Why a separate task? The PostSafety plan runs on popular posts where we want to refresh the safety bools but don't want to overwrite the entire UPA row (which has tags, topics, description from the original banger run). Upsert is the right tool.

TaskWriteReplyRankingManhattan — reply scoring + spam action

class TaskWriteReplyRankingManhattan(Task):
    DISABLE_RULES = [DisableTaskForNonProd]

    _strato_grok_reply_spam_action_with_labels = StratoGrokReplySpamActionWithLabels()

    @classmethod
    async def _exec(cls, ctx: TaskContext) -> None:
        post = ctx.payload.post
        results = ctx.reply_ranking_results
        if not post:
            return
        if not results:
            Metrics.counter("task.write_reply_ranking_manhattan.skipped.count").add(
                1, attributes={"reason": "no_results"}
            )
            return
        Metrics.counter("task.write_reply_ranking_manhattan.intaken.count").add(1)
        try:
            await cls._publish_to_reply_ranking_manhattan(post, results)
            logger.info(
                f"Published reply ranking post to manhattan: {post.id=} {post.user.id=}"
            )
        except Exception:
            Metrics.counter("task.write_reply_ranking_manhattan.failed.count").add(1)
            logger.error(
                f"Failed to write reply ranking score to manhattan: {traceback.format_exc()}"
            )
            raise

Reads from ctx.reply_ranking_results (Session 21's TaskRankReplies populates this). Standard publish-with-metrics shape.

    @classmethod
    async def _publish_to_reply_ranking_manhattan(
        cls, post: Post, results: list[ReplyScoreResult]
    ):
        logger.info(
            f"[_publish_to_reply_ranking_manhattan] checking results: {results}"
        )
        reasoning = ""

        try:
            reply_ranking_result = next(r for r in results)
        except:
            reply_ranking_result = None

        score = reply_ranking_result.score if reply_ranking_result else 3.0
        reasoning = reply_ranking_result.reason if reply_ranking_result else ""

next(r for r in results) — get the first result, or None if empty (the bare except: catches StopIteration from an empty generator).

Defaults if no result: score=3.0 (max), reasoning="". Score of 3 means "best" on the 0-3 scale, so the default is permissive — a missing classification doesn't penalize the reply.

        if post.user:
            logger.info(
                f"[_publish_to_reply_ranking_manhattan] {reasoning=} {post.id=} {post.user.id=} {score=}"
            )
        else:
            logger.info(
                f"Missing user id [_publish_to_reply_ranking_manhattan] {reasoning=} {post.id=} {score=}"
            )

        if score == 0.0:
            action_result = (
                await cls._strato_grok_reply_spam_action_with_labels.execute(
                    int(post.id)
                )
            )
            if action_result and len(action_result.applied_labels) > 0:
                logger.info(
                    f"grokReplySpamActionWithLabels applied labels: debugString='{action_result.debug_string}', appliedLabels={action_result.applied_labels} for post {post.id}"
                )
                Metrics.counter(
                    "task.grok_reply_spam_action_with_labels.applied.count"
                ).add(1)
            elif action_result:
                logger.info(
                    f"grokReplySpamActionWithLabels no labels applied: debugString='{action_result.debug_string}' for post {post.id}"
                )
                Metrics.counter(
                    "task.grok_reply_spam_action_with_labels.empty.count"
                ).add(1)
            else:
                logger.info(f"grokReplySpamActionWithLabels failed for post {post.id}")
                Metrics.counter(
                    "task.grok_reply_spam_action_with_labels.failed.count"
                ).add(1)

Score-zero triggers a moderation action — replies the LLM scored exactly 0.0 are treated as spam. Calls the Strato grokReplySpamActionWithLabels to apply moderation labels.

Three log/metric paths (applied / empty / failed) for the action result. Same pattern we saw in TaskGrokUpaActionWithLabels (Session 21).

        await ReplyRankingScoreStratoLoader.save_reply_ranking_score(
            post_id=post.id,
            reply_ranking_score=ReplyRankingScore(
                score=score, reasoning=reasoning[-500:]
            ),
        )

        await ReplyRankingScoreStratoLoader.save_reply_ranking_kafka_v2(
            post_id=post.id,
            reply_ranking_score_kafka=ReplyRankingScoreKafka(
                postId=int(post.id), score=score, reasoning=reasoning[-500:]
            ),
        )

        Metrics.counter("task.write_reply_ranking_manhattan.success.count").add(
            1, attributes={"column": "reply_ranking"}
        )

Write the score to two destinations:

  1. Manhattan KVReplyRankingScore(score, reasoning).
  2. Kafka topic (save_reply_ranking_kafka_v2) — ReplyRankingScoreKafka(postId, score, reasoning).

Reasoning is [-500:] — keep only the last 500 characters. Truncating from the right preserves the conclusion of the reasoning (the model usually puts the verdict at the end). Bounded storage cost.

TaskWriteReplySpamManhattan — spam detection action + write

class TaskWriteReplySpamManhattan(Task):
    DISABLE_RULES = [DisableTaskForNonProd]

    _strato_grok_reply_spam_action_with_labels = StratoGrokReplySpamActionWithLabels()

    @classmethod
    async def _exec(cls, ctx: TaskContext) -> None:
        post = ctx.payload.post
        if not post:
            return

        results = ctx.content_categories
        for result in results:
            if result.category == ContentCategoryType.SPAM_COMMENT:
                if result.positive:
                    action_result = (
                        await cls._strato_grok_reply_spam_action_with_labels.execute(
                            int(post.id)
                        )
                    )
                    if action_result and len(action_result.applied_labels) > 0:
                        ...  # standard 3-arm logging
                    elif action_result:
                        ...
                    else:
                        ...

                await ReplySpamStratoLoader.save_spam_reply_annotation(
                    post.id, result.score, result.positive, ""
                )
                logger.info(
                    f"Published reply spam annotation to manhattan: {post.id=} {post.user.id=}"
                )

Sibling of TaskWriteReplyRankingManhattan but for the spam path. The spam classifier writes a SPAM_COMMENT category result; this task picks it out, applies the spam-action if positive, then writes the annotation regardless.

Always writes, action only fires on positive: this means the annotation row is created for every spam classification (both spam and not-spam), but the moderation labels only get applied on positive verdicts. The annotation row is the auditable record of what was classified; the labels are the operational consequence.

save_spam_reply_annotation(post.id, score, positive, "") — the empty string is the "reason" parameter. The Pydantic spam classifier (Session 20) produces a reason but it's not threaded through here. Looks like a small simplification — moderation just records the score and verdict, not the LLM's reasoning.


task_write_mm_embedding_sink.py (193 lines)

Five embedding-sink classes, sharing a base. The naming is by stored model version, not the embedder version. There's a many-to-one mapping: the same MultimodalPostEmbedderV2 (Session 20) might be invoked with different model configs to produce different sink versions.

Base class with retry

class TaskWriteMMEmbeddingSinkBase(TaskWithPost):
    model_version: str

    DISABLE_RULES = [DisableTaskForNonMmEmbProd]

    @classmethod
    @retry(stop=stop_after_attempt(3), wait=wait_chain(wait_fixed(1), wait_fixed(2)))
    async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
        return await Task.exec.__wrapped__(cls, ctx)

mm_emb_prod cluster only. 3 attempts with wait_chain(1, 2): wait 1s after first failure, 2s after second. (Less aggressive than the default base retry, but more attempts.)

Sinks need retries because they touch external Strato/Kafka — transient failures here shouldn't fail the whole plan.

TaskWriteMMEmbeddingSinkExperiment — the "v2" experimental sink

class TaskWriteMMEmbeddingSinkExperiment(TaskWriteMMEmbeddingSinkBase):
    model_version = "v2"

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        start_time = time.perf_counter_ns()
        embedding = ctx.multimodal_post_embedding_dict[cls.model_version]
        assert embedding is not None
        query = StratoPostMultimodalEmbeddingMhSearchAi()
        await query.put(
            int(post.id),
            cls.model_version,
            TweetEmbedding(tweetId=int(post.id), embedding1=embedding),
        )
        logger.info(
            f"wrote post embedding to strato sink for post {post.id} (model: {cls.model_version})"
        )
        duration_ms = (time.perf_counter_ns() - start_time) / 1_000
        Metrics.histogram(
            "task.write_post_embedding_sink_experiment.duration_ms"
        ).record(duration_ms)
        Metrics.counter("task.write_post_embedding_sink_experiment.count").add(1)

Reads ctx.multimodal_post_embedding_dict["v2"] (the experimenter's embedder must've written to this key — not visible in the files we've seen, so this might be wired only in an experiment plan). Writes to StratoPostMultimodalEmbeddingMhSearchAi Manhattan column with the model_version="v2" tag.

The assert embedding is not None — programming-error guard. The dict access would KeyError before this if missing.

Same / 1_000 "ms but actually µs" pattern from TaskLoadPostWithNotFoundRetry. Confusing naming.

TaskWriteMMEmbeddingSinkV3 — full V3 sink (summary + embedding)

class TaskWriteMMEmbeddingSinkV3(TaskWriteMMEmbeddingSinkBase):
    model_version = "v3"

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        start_time = time.perf_counter_ns()

        summary = post.summary
        assert summary is not None
        stratoPostMultimodalEmbeddingGrokSummaryMh = (
            StratoPostMultimodalEmbeddingGrokSummaryMh()
        )
        await stratoPostMultimodalEmbeddingGrokSummaryMh.put(
            int(post.id), cls.model_version, summary
        )

        embedding = ctx.multimodal_post_embedding_dict[cls.model_version]
        assert embedding is not None
        query = StratoPostMultimodalEmbeddingMhSearchAi()
        await query.put(
            int(post.id),
            cls.model_version,
            TweetEmbedding(tweetId=int(post.id), embedding1=embedding),
        )

        stratoMultiModalEmbeddingTopic = StratoMultiModalEmbeddingTopic()
        await stratoMultiModalEmbeddingTopic.insert(
            TweetEmbedding(tweetId=int(post.id), embedding1=embedding)
        )
        ...

V3 writes three destinations:

  1. Summary KVpost.summary to StratoPostMultimodalEmbeddingGrokSummaryMh (so others can fetch the summary by post ID; we saw TaskLoadPostWithSummary from Session 21 read this).
  2. Embedding KV — the vector to StratoPostMultimodalEmbeddingMhSearchAi.
  3. Kafkainsert to StratoMultiModalEmbeddingTopic (a Strato wrapper around a Kafka topic).

V3 has the most cooperating storage of any sink — both the embedding and the source summary need to live separately.

TaskWriteMMEmbeddingSinkV4 — Kafka via helper class

class TaskWriteMMEmbeddingSinkV4(TaskWriteMMEmbeddingSinkBase):
    model_version = "v4"

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        start_time = time.perf_counter_ns()

        embedding = ctx.multimodal_post_embedding_dict[cls.model_version]
        assert embedding is not None
        query = StratoPostMultimodalEmbeddingMhSearchAiNoCache()
        await query.put(
            int(post.id),
            cls.model_version,
            TweetEmbedding(tweetId=int(post.id), embedding1=embedding),
        )

        await TaskPublishEmbeddingV4Kafka._publish_to_kafka(post, embedding)
        ...

V4 differences:

  • NoCache variant of the Strato query (StratoPostMultimodalEmbeddingMhSearchAiNoCache) — bypasses the read-side cache. The cache invalidates lazily; with no-cache writes, downstream queries see fresh values immediately at the cost of cache-miss overhead. Used for V4 because V4 is the rec-sys-feeding embedding — staleness costs you bad recommendations.
  • No separate Kafka taskTaskPublishEmbeddingV4Kafka._publish_to_kafka(post, embedding) is called directly as a helper. This bypasses TaskPublishEmbeddingV4Kafka's _exec_with_post (which has its own metrics, disable rules, error handling) — interesting code reuse. The publish-to-kafka method does NOT register any of those things, just does the raw send.

TaskWriteMMEmbeddingSinkV5 — same shape as V4

class TaskWriteMMEmbeddingSinkV5(TaskWriteMMEmbeddingSinkBase):
    model_version = "v5_1"

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        start_time = time.perf_counter_ns()

        embedding = ctx.multimodal_post_embedding_dict[cls.model_version]
        assert embedding is not None
        query = StratoPostMultimodalEmbeddingMhSearchAiNoCache()
        await query.put(
            int(post.id),
            cls.model_version,
            TweetEmbedding(tweetId=int(post.id), embedding1=embedding),
        )

        await TaskPublishEmbeddingV5Kafka._publish_to_kafka(post, embedding)
        ...

Identical to V4 but with model_version="v5_1" and uses TaskPublishEmbeddingV5Kafka._publish_to_kafka. Almost a copy-paste.

TaskWriteMMEmbeddingSinkV5SkipKafkaForReplies — reply-aware variant

class TaskWriteMMEmbeddingSinkV5SkipKafkaForReplies(TaskWriteMMEmbeddingSinkBase):
    model_version = "v5_1"

    @classmethod
    async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
        start_time = time.perf_counter_ns()

        embedding = ctx.multimodal_post_embedding_dict[cls.model_version]
        assert embedding is not None
        query = StratoPostMultimodalEmbeddingMhSearchAiNoCache()
        await query.put(
            int(post.id),
            cls.model_version,
            TweetEmbedding(tweetId=int(post.id), embedding1=embedding),
        )

        is_reply = bool(post.ancestors)
        if not is_reply:
            await TaskPublishEmbeddingV5Kafka._publish_to_kafka(post, embedding)
        else:
            Metrics.counter(
                "task.write_post_embedding_sink_v5.kafka_skipped_reply.count"
            ).add(1)
            logger.info(
                f"Skipping Kafka publish for reply post {post.id} (written to Manhattan only)"
            )
        ...

Same as V5 except the Kafka publish is conditional on not being a reply. Replies get written to Manhattan only.

The reason: downstream consumers of GROX_MULTIMODAL_EMBEDDING_V5 are search and recommendation systems that don't care about reply embeddings — those are only consumed via direct Manhattan reads from reply-specific code. Saving the Kafka traffic shaves cost and partition pressure.

The metric kafka_skipped_reply.count lets you see how much traffic was saved.


task_write_safety_post_annotations_result_sink.py (342 lines)

The most elaborate sink. Combines: bool-metadata derivation from violations, merge with existing state, safemodel fallback injection (when the PTOS classifier missed adult content), action labeling, write to two destinations.

Setup

class TaskWriteSafetyPostAnnotationsResultSink(Task):
    DISABLE_RULES = [DisableTaskForNonPtosProd]

    _strato_mh = StratoSafetyPostAnnotationsResultMh()
    _strato_direct_mh = StratoSafetyPostAnnotationsResultDirectMh()
    _strato_kafka = StratoSafetyPostAnnotationsResultKafka()
    _strato_grok_ptos_action_with_labels = StratoGrokPtosActionWithLabels()
    _strato_grok_ptos_delete_labels = StratoGrokPtosDeleteLabels()

PTOS-prod-only. Five Strato endpoints:

  • mh — the cached-read Manhattan write.
  • direct_mh — the no-cache Manhattan read (used to fetch existing state).
  • kafka — the Kafka-style insert.
  • grok_ptos_action_with_labels — apply moderation labels based on the new annotation.
  • grok_ptos_delete_labels — remove labels (in case re-classification overrides earlier decisions).

_build_found_metadata — media count helper

    @staticmethod
    def _build_found_metadata(post) -> FoundMetadata:
        return FoundMetadata(
            imageCount=sum(1 for m in post.media if isinstance(m, Image))
            if post.media
            else 0,
            videoCount=sum(1 for m in post.media if isinstance(m, Video))
            if post.media
            else 0,
            cardV2Count=len(post.cardsV2) if post.cardsV2 else 0,
        )

Same shape as TaskPublishUnifiedPostAnnotationsManhattan's foundMetadata construction. Could be shared; isn't.

_compute_bool_metadata_from_violations — derive bool flags

    @classmethod
    def _compute_bool_metadata_from_violations(
        cls, safety_annotations
    ) -> SafetyBoolMetadata:
        is_gore = False
        is_nsfw = False
        is_soft_nsfw = False
        is_spam = False

        if safety_annotations and safety_annotations.violatedPolicies:
            for violation in safety_annotations.violatedPolicies:
                if (
                    violation.category == SafetyPolicyCategory.ViolentMedia
                    and violation.safetyPolicy
                    and violation.safetyPolicy.policyType
                    != SafetyPolicyType.NoViolation
                ):
                    is_gore = True
                    Metrics.counter(
                        "task.write_safety_post_annotations_result_sink.detected_gore.count"
                    ).add(1)

                if (
                    violation.category == SafetyPolicyCategory.AdultContent
                    and violation.safetyPolicy
                    and violation.safetyPolicy.policyType
                    == SafetyPolicyType.AdultContentSexualHard
                ):
                    is_nsfw = True
                    Metrics.counter(
                        "task.write_safety_post_annotations_result_sink.detected_nsfw.count"
                    ).add(1)

                if (
                    violation.category == SafetyPolicyCategory.AdultContent
                    and violation.safetyPolicy
                    and violation.safetyPolicy.policyType
                    == SafetyPolicyType.AdultContentSexualSoft
                ):
                    is_soft_nsfw = True
                    Metrics.counter(
                        "task.write_safety_post_annotations_result_sink.detected_soft_nsfw.count"
                    ).add(1)

                if (
                    violation.category == SafetyPolicyCategory.Spam
                    and violation.safetyPolicy
                    and violation.safetyPolicy.policyType
                    != SafetyPolicyType.NoViolation
                ):
                    is_spam = True
                    Metrics.counter(
                        "task.write_safety_post_annotations_result_sink.detected_spam.count"
                    ).add(1)

                if (
                    violation.category
                    == SafetyPolicyCategory.IllegalAndRegulatedBehaviors
                    and violation.safetyPolicy
                    and violation.safetyPolicy.policyType
                    != SafetyPolicyType.NoViolation
                ):
                    is_spam = True
                    Metrics.counter(
                        "task.write_safety_post_annotations_result_sink.detected_spam_illegal.count"
                    ).add(1)

        return SafetyBoolMetadata(
            isGore=is_gore, isNsfw=is_nsfw, isSoftNsfw=is_soft_nsfw, isSpam=is_spam
        )

Translate from category-based annotations (Session 20) to the 4-bool summary consumed by downstream simple readers:

  • ViolentMedia (any non-NoViolation policy) → isGore=True.
  • AdultContent → AdultContentSexualHardisNsfw=True.
  • AdultContent → AdultContentSexualSoftisSoftNsfw=True.
  • Spam (non-NoViolation) → isSpam=True.
  • IllegalAndRegulatedBehaviors (non-NoViolation) → isSpam=True (yes, the same bool — "spam" is the catch-all for unwanted content).

Note the subtlety: AdultContent uses exact match on policyType (Hard vs Soft are distinct), while ViolentMedia and Spam use != NoViolation (any sub-policy counts). The reason: violent media has multiple severity levels but they all warrant isGore=True; adult content has a meaningful split between hard (block) and soft (label-as-NSFW-but-show).

_merge_bool_metadata — OR-merge with existing

    @classmethod
    def _merge_bool_metadata(
        cls, existing: SafetyBoolMetadata | None, new: SafetyBoolMetadata
    ) -> SafetyBoolMetadata:
        if existing is None:
            return new
        return SafetyBoolMetadata(
            isGore=True if (existing.isGore or new.isGore) else False,
            isNsfw=True if (existing.isNsfw or new.isNsfw) else False,
            isSoftNsfw=True if (existing.isSoftNsfw or new.isSoftNsfw) else False,
            isSpam=True if (existing.isSpam or new.isSpam) else False,
        )

Latched-OR semantics: once a flag is set True, it stays True even if a later classification disagrees. This is the conservative policy — once classified as NSFW, always treated as NSFW. The classifier can't un-classify.

The True if X else False is verbose for just bool(X) but is clear about the boolean nature. Functionally equivalent.

Main _exec

    @classmethod
    async def _exec(cls, ctx: TaskContext) -> None:
        Metrics.counter("task.write_safety_post_annotations_result_sink.count").add(1)

        post = ctx.payload.post
        if not post:
            return

        safety_annotations = ctx.safety_annotations
        if not safety_annotations:
            return

        post_id = int(post.id)

        existing_result = await cls._strato_direct_mh.fetch(post_id)
        if existing_result:
            Metrics.counter(
                "task.write_safety_post_annotations_result_sink.existing_found.count"
            ).add(1)
        else:
            Metrics.counter(
                "task.write_safety_post_annotations_result_sink.existing_not_found.count"
            ).add(1)

Fetch existing state first — using the direct (no-cache) variant so we don't read stale data from the cache and overwrite recent updates.

        if safety_annotations.violatedPolicies:
            safety_annotations.violatedPolicies.sort(
                key=lambda x: x.score or 0, reverse=True
            )

        task_type_suffix = (
            ctx.payload.task_type.value if ctx.payload.task_type else "unknown"
        )
        identifier = f"{task_type_suffix}"
        timestamp_ms = int(time.time() * 1000)
        found_metadata = cls._build_found_metadata(post)

        new_annotation = SafetyPostAnnotations(
            tweetId=post_id,
            violatedPolicies=[
                policy.model_dump()
                for policy in (safety_annotations.violatedPolicies or [])
            ],
            foundMetadata=found_metadata,
            identifier=identifier,
            timestamp=timestamp_ms,
        )

Sort violations by score (desc), so the most severe is first. Build a new annotation record with the task_type as identifier (e.g., "safety_ptos_deluxe"), the current timestamp in ms, and the violations.

        annotations_list = (
            list(existing_result.safetyPostAnnotations)
            if existing_result and existing_result.safetyPostAnnotations
            else []
        )
        annotations_list.append(new_annotation)

Append-only: prior annotations are kept; the new one is added. Full classification history per post lives in this column. This is the audit trail: who classified what, when, with what result.

        new_bool_metadata = cls._compute_bool_metadata_from_violations(
            safety_annotations
        )
        existing_bool_metadata = (
            existing_result.safetyBoolMetadata if existing_result else None
        )
        merged_bool_metadata = cls._merge_bool_metadata(
            existing_bool_metadata, new_bool_metadata
        )

Derive new bools from this run's violations, then OR-merge with existing bools.

        violation_details = []
        for v in safety_annotations.violatedPolicies:
            policy_type = v.safetyPolicy.policyType.name if v.safetyPolicy else "none"
            violation_details.append(f"{v.category.value}:{policy_type}")
        violations_summary = ", ".join(violation_details)

        action_result = await cls._strato_grok_ptos_action_with_labels.execute(
            new_annotation
        )
        if action_result and len(action_result.applied_labels) > 0:
            logger.info(
                f"grokPtosActionWithLabels applied labels: debugString='{action_result.debug_string}', appliedLabels={action_result.applied_labels} for post {post_id} (result_sink), violations=[{violations_summary}]"
            )
            ...
        elif action_result:
            ...
        else:
            ...

Apply moderation labels via Strato grokPtosActionWithLabels. Same 3-arm logging pattern (applied / empty / failed).

The violations_summary string is built ahead of time and included in every log line — so when an SRE greps for a specific category, they see exactly which violations triggered the action.

Safemodel fallback injection

        ptos_already_nsfw = new_bool_metadata.isNsfw
        if ctx.safemodel_sex_nudity.positive and not ptos_already_nsfw:
            safemodel_confidence_int = round(ctx.safemodel_sex_nudity.confidence * 100)
            safemodel_annotation = SafetyPostAnnotations(
                tweetId=post_id,
                violatedPolicies=[
                    SafetyPtosViolatedPolicy(
                        category=SafetyPolicyCategory.AdultContent,
                        score=safemodel_confidence_int,
                        reason="safemodel sex-and-nudity classifier detected adult content",
                        safetyPolicy=SafetyPolicy(
                            policyType=SafetyPolicyType.AdultContentSexualHard,
                            confidenceScore=safemodel_confidence_int,
                            reason="safemodel sex-and-nudity classifier detected adult content",
                        ),
                    ).model_dump(),
                ],
                foundMetadata=found_metadata,
                identifier="safemodel-sex-nudity",
                timestamp=timestamp_ms,
            )
            annotations_list.append(safemodel_annotation)
            merged_bool_metadata = cls._merge_bool_metadata(
                merged_bool_metadata,
                SafetyBoolMetadata(
                    isGore=False, isNsfw=True, isSoftNsfw=False, isSpam=False
                ),
            )

ctx.safemodel_sex_nudity — a context field set by a task we haven't seen (probably a separate TaskSafeModelClassifier not exposed in our file inventory). The "safemodel" is a traditional computer-vision NSFW classifier (CLIP-style, fast) that runs alongside the LLM-based PTOS classifier.

Defense-in-depth: if the safemodel says "this is sex/nudity" but the LLM PTOS missed it → inject a fake violation marked with identifier="safemodel-sex-nudity" so future audit can tell this came from safemodel, not LLM. Mark isNsfw=True in the merged bool metadata.

The confidence is scaled from 0.0-1.0 to 0-100 integer for the Thrift schema.

This is the safety-belt mechanism: two independent classifiers, OR'd together for the most consequential class (NSFW). False positives are tolerable; missing nudity is not.

            Metrics.counter(
                "task.write_safety_post_annotations_result_sink.safemodel_enforced.count"
            ).add(1)
            safemodel_action_result = (
                await cls._strato_grok_ptos_action_with_labels.execute(
                    safemodel_annotation
                )
            )
            if (
                safemodel_action_result
                and len(safemodel_action_result.applied_labels) > 0
            ):
                logger.info(
                    f"safemodel enforce: grokPtosActionWithLabels applied labels: debugString='{safemodel_action_result.debug_string}', "
                    f"appliedLabels={safemodel_action_result.applied_labels} for post {post_id}"
                )
                ...
            elif safemodel_action_result:
                ...
            else:
                ...

Same action-applying logic as above, but for the safemodel annotation. Separate metric namespace (safemodel_action.*) so you can monitor safemodel-only enforcement separately from LLM-only enforcement.

Final write

        final_result = SafetyPostAnnotationsResult(
            tweetId=post_id,
            safetyPostAnnotations=annotations_list,
            safetyBoolMetadata=merged_bool_metadata,
        )

        delete_labels_result = await cls._strato_grok_ptos_delete_labels.execute(
            final_result
        )
        if delete_labels_result:
            logger.info(
                f"grokPtosDeleteLabels returned '{delete_labels_result}' for post {post_id} (result_sink)"
            )
            Metrics.counter(
                "task.write_safety_post_annotations_result_sink.grok_ptos_delete_labels.count"
            ).add(1)
        else:
            logger.info(
                f"grokPtosDeleteLabels returned no result for post {post_id} (result_sink)"
            )
            Metrics.counter(
                "task.write_safety_post_annotations_result_sink.grok_ptos_delete_labels.empty.count"
            ).add(1)

        await cls._strato_mh.put(post_id, final_result)
        Metrics.counter(
            "task.write_safety_post_annotations_result_sink.mh.success.count"
        ).add(1)

        await cls._strato_kafka.insert(post_id, final_result)
        Metrics.counter(
            "task.write_safety_post_annotations_result_sink.kafka.success.count"
        ).add(1)

        Metrics.counter(
            "task.write_safety_post_annotations_result_sink.success.count"
        ).add(1)

Three final steps:

  1. Delete-labels check — given the final merged result, ask Strato if any previously-applied labels should be deleted. This handles cases like: this post was flagged NSFW yesterday, but today the deluxe rerun says no violation → remove the NSFW label. The delete-labels endpoint reads the final state and decides; we don't tell it what to delete, it figures out the diff.

  2. Manhattan put — write the merged result (with full annotation history + merged bools).

  3. Kafka insert — also publish to a Kafka stream so downstream consumers (other moderation systems, dashboards) can react in near-real-time.

That's three writes for one classification result: KV (for direct reads), Kafka (for stream consumers), and label adjustments via the delete-labels endpoint. Belt and suspenders for the most safety-critical pipeline.


Summary of Session 22

The publishing layer is fan-out: each task takes the results computed by upstream tasks and writes them to multiple destinations — Strato Manhattan KV, Kafka topics, Strato action-applying endpoints.

Key patterns:

  • Append-only annotation history: safety annotations are accumulated, never overwritten. Every classification contributes a record; bool-metadata is OR-merged so a "yes" never reverts to "no" without explicit delete-labels logic.
  • Multi-destination writes: most sinks write to both Manhattan (for reads) and Kafka (for streams). Idempotent enough that double-writes don't corrupt.
  • No-cache reads/writes for staleness-sensitive paths: V4/V5 embedding sinks and the safety sink's existing-state fetch use NoCache variants.
  • Safemodel defense-in-depth: NSFW gets a second-opinion classifier; positive from either is enough.
  • Disable rules per cluster: mm_emb_prod for embedding sinks, ptos_prod for safety sinks. The Grox service runs as separate per-workload clusters; tasks gate on which cluster they're in.
  • KafkaTopicName.value strings as the source of truth: every Kafka producer is constructed lazily via @cache keyed on topic.

Series wrap-up

That's 22 sessions, ~24,914 LOC.

Sessions Module LOC
01 candidate-pipeline/ (Rust) 1,031
02–03 thunder/ (Rust) 1,808
04–14 home-mixer/ (Rust) 11,695
15–17 phoenix/ (Python ML) 3,880
18–22 grox/ (Python LLM pipeline) 6,500

What we've walked through:

Rust core (1–14)

  • candidate-pipeline is the generic framework: a typed pipeline runtime with hydrators (fetch data), filters (drop candidates), scorers (assign value), selectors (pick top-K), and side-effects (write back). The whole home-mixer is just an instantiation of this framework with a specific composition of stages.

  • thunder is the in-memory post store: DashMap-backed fast hot-path reads, Kafka ingest from the unified-posts topic, Thrift deserialization, a gRPC server. Posts live here for a configurable retention window before being evicted.

  • home-mixer is the For You orchestrator (11 sessions and ~12k LOC). Receives a "give me a timeline for user X" request, walks pipelines that hydrate candidates from multiple sources (in-network, popular, ads), applies filters (language, blocked-by, has-media, mod, safety, etc.), runs scorers (Phoenix ranking models, ads pacing), selects top-K, applies side-effects (cache writes, served-history, kafka logs). The complexity is in the composition — different pipelines call different filter combinations to produce a slate.

Python ML (15–17)

  • Phoenix models (recsys_model.py, recsys_retrieval_model.py) implement the two-tower architecture: a user-tower transformer that consumes the user's recent history (via right-anchored RoPE positions) and a candidate-tower MLP. Trained with a sampled-softmax retrieval loss + per-task ranking heads.

  • Phoenix runners wrap the models in a serving-style API: ModelRunner for ranking, RetrievalModelRunner for two-tower retrieval. Both use Haiku transform + numpy checkpoint loading. The run_pipeline.py is the headline open-source addition that runs retrieval → ranking end-to-end from exported checkpoints.

  • Grok transformer (grok.py) is the underlying neural net: GQA attention with tanh-clamping, GeGLU FFN, double layer norm, candidate-isolation attention mask. Plus three test files that pin down the most subtle pieces.

Python content understanding (18–22)

  • Grox core is three Python processes (main/dispatcher/engine) coordinating through a multiprocessing Manager. 16 Kafka task generators feed a priority-multiplexed queue; tasks are routed by eligibility set.

  • Plans are dependency-DAG configurations: a plan declares a TASKS map + TASK_DEPENDENCIES map, the executor wires asyncio futures, SKIP propagates downstream automatically.

  • Embedders + summarizer + classifiers are the ML calls: VLM-based content classification with model-tier escalation (mini → primary → primary-critical → EAPI 4.2 reasoning), JSON-output parsing with multi-tier fallback, two-stage safety classification (category → per-category policy), the V2/V3/V4/V5 embedder family.

  • Tasks are imperative units: filters (eligibility), rate limiters (TTL-cache dedup), media hydration, ASR transcription, classifier wrappers, and finally publishers/sinks that fan results out to Manhattan and Kafka.

The full data flow

A Tweet is posted. The X core system writes it. Thunder consumes the Kafka event and stores it in-memory. Grox consumes parallel Kafka events:

  • task_initial_banger_filtertask_media_hydrationtask_banger_screen_initial → write to UnifiedPostAnnotations (description, tags, topics, bool metadata, quality score).
  • task_safety_ptos_filtertask_safety_ptos_category_detectiontask_safety_ptos_policy_detection → write to SafetyPostAnnotationsResult (violations, severity, applied labels via grokPtosActionWithLabels).
  • task_post_embedding_with_summary_filtertask_post_embedding_summarizertask_multimodal_post_embedding_with_summary → write to StratoPostMultimodalEmbedding* (the rec-sys feature vector).

The user opens For You. home-mixer receives the request, runs many candidate pipelines in parallel (in-network, popular, etc.), each:

  • Hydrates candidate post IDs from a source.
  • Reads each post from Thunder + the various Strato KVs (including the Grox-populated UPA + safety annotations + multimodal embeddings).
  • Filters on safety, language, has-media, ads-brand-safety (which reads Grox's safety bools), etc.
  • Scores via Phoenix ranking model (which reads Grox's multimodal embeddings as features).
  • Selects top-K per source.
  • Merges sources, applies global ads injection and rerank.

The final ranked slate goes back to the user. Side-effects: write to served-history, cache, kafka loggers.

When the user engages with a post (like, click, retweet), more Kafka events fire. Phoenix training data pipelines pick them up and update the model. Phoenix retrains, exports a new checkpoint. home-mixer loads it. The cycle continues.

What's missing (deliberately or accidentally)

The open-source dump excludes a few things:

  • The actual prompt templates under grox/prompts/template/. The classifier prompts are the IP; the surrounding plumbing is shared.
  • Training pipelines for Phoenix. We have the model code and the inference path, but not the training loop, data loaders, or eval scripts. (Phoenix's run_pipeline.py does inference only.)
  • A handful of files referenced but not present: media_processor.py, data_types.py, data_loaders/mappers/post_mapper.py. The interfaces are inferable from callers.
  • Some redacted thresholds: FOLLOWER_COUNT_THRESHOLD_FOR_SPAM_DETECTION, RECENT_POSTS_LIMIT, and a few _THINKING_RESTRICTION_LINES strings.

What's there is enough to understand the architecture and flow control: how a tweet becomes a ranked candidate, how moderation acts, how features are computed and consumed.

Architecture takeaways

  1. Composability via pipeline framework. Rust home-mixer is ~12k LOC because the framework is generic; each pipeline is small and composes filters, hydrators, scorers, selectors. New surfaces (subscribed-only, banger-initial-screen) reuse the same primitives.

  2. Per-cluster service sharding. Grox runs as separate prod clusters per workload (mm-emb-prod, ptos-prod, etc.). Tasks gate on which cluster they're in via DisableRules. Each cluster can be scaled and rolled out independently.

  3. In-process state for speed, idempotent sinks for correctness. Rate-limit caches and topic caches live in-process; cross-process consistency isn't guaranteed. Downstream sinks are idempotent (key-by-tweet-ID) so any duplicates don't corrupt state.

  4. DAG execution via asyncio futures. Both Phoenix's runners.py and Grox's Plan.execute use the same pattern: create a future per dependency, every task awaits its predecessors, asyncio's scheduler handles topo sort + parallel start. Beautiful pattern, no custom DAG engine needed.

  5. Constrained JSON output + multi-tier fallback. Every LLM call uses json_schema constrained decoding. If parsing fails, fall back to regex + json_repair + scalar extraction. Then give up.

  6. Defense-in-depth on safety. PTOS uses two-stage classification (category, then per-category policy). Adult content gets an OR with a separate safemodel CV classifier. Two-cache (standard + deluxe). Deluxe routes to Grok 4.2 reasoning for the highest-stakes categories.

  7. Audit log first. Annotations are append-only. Bool metadata is OR-merged latched. Action results carry debug strings. Every classification logs the violations summary inline.

  8. Open-source as documentation. This codebase is, in many places, the only public documentation of how a major social-recommendation system works in 2026. Worth knowing how it fits together.

Thanks for following along. That's the X For You algorithm, line by line.