X For You algorithm, line by line · Part 20

X For You algorithm, line by line — Part 20: Grox embedder, summarizer, classifiers

Part 20 — the ML layer of Grox. Two multimodal embedders (V2 with 5 client choices, V5 with single HTTP path), the post summarizer, and six LLM-call classifiers (spam, banger, post-safety, two-stage PTOS, reply ranker). All share the same system-prompt + User+Post + assistant-slot + parse-JSON pattern, with model-tier escalation (mini → primary → primary-critical → EAPI 4.2) for the highest-stakes calls.

May 15, 2026·34 min read

The ML layer of Grox. This is what plans (Session 19) actually call into when they get to the "model inference" step.

Three subsystems, all sharing the same pattern (system prompt → user message with User+Post rendered → assistant slot → sample → parse JSON in <json>...</json> tags):

  1. Embedder (grox/embedder/, 407 LOC) — vector embedding of posts using multiple model families (Qwen3, RecsysV4, RecsysV5).
  2. Summarizer (grox/summarizer/, 143 LOC) — LLM-based text summarization, used as input for the V3 embedder.
  3. Classifiers (grox/classifiers/content/, 900 LOC) — six concrete LLM-call wrappers: spam, banger, post-safety, safety-PTOS (two stages), reply-ranking.

Total: 1,450 LOC.

Common dependencies referenced throughout but outside our scope:

  • grok_sampler.vision_sampler.VisionSampler and grok_sampler.eapi_sampler.EapiSampler — the raw sampling clients for VLM and EAPI (Edge API / external) models respectively.
  • grox.lm.convoConversation/Message/Role types (with an interleave() method that emits the model-ready sequence).
  • grox.lm.post.PostRenderer, grox.lm.user.UserRenderer, grox.lm.thread.ThreadRenderer — convert a Post/User into a list of (text, image, video) content parts.
  • grox.prompts.template.* — system-prompt templates with .render(params={...}).
  • grox.data_loaders.data_types.* — the Post, User, ContentCategoryType, ContentCategoryResult, etc. data classes.

embedder/multimodal_post_embedder_v2.py (287 lines)

Despite the "V2" name, this class also handles V3/V4 embeddings — it's the general-purpose embedder that talks to multiple XaiEmbeddingClient backends.

Constructor — pick from a zoo of embedding models

import os
import time
import logging
import json
import numpy as np
from grox.lm.post import LitePostRenderer, MMEmbedPostRenderer, EvalPostRenderer
from grox.lm.convo import Image as ConvoImage, Video as ConvoVideo, Content
from embed.embed_cli import XaiEmbeddingClient
from monitor.metrics import Metrics
from grox.config.config import ModelName, grox_config
from grox.data_loaders.data_types import Post
from grox.data_loaders.media_loader import MediaLoader
from grox.data_loaders.strato_loader import TweetStratoLoader
from grox.data_loaders.media_description_loader import MediaDescriptionLoader
from strato_http.queries.post_multimodal_embedding_mh_searchai import (
    StratoContentUnderstandingUnifiedPostAnnotations,
)
from grox.config.config import EmbeddingModelConfig
class MultimodalPostEmbedderV2:
    def __init__(
        self,
        model: str = "qwen3",
        use_grok_summary: bool = False,
        renderer_version="mmembed_summary",
        use_media_descriptions: bool = False,
        use_post_context_summary: bool = False,
        use_grok_summary_path: str = "",
        custom_endpoint: str = "",
        instruction: str = "",
    ):
        embed_config = grox_config.get_embedding_model(ModelName.EMBED_PRIMARY)
        video_embed_config = grox_config.get_embedding_model(ModelName.EMBED_PRIMARY_VIDEO)
        embed_config.text_max_len = 8192
        video_embed_config.text_max_len = 8192
        qwen_3_embed_06b_config = grox_config.get_embedding_model(
            ModelName.EMBED_SMALL
        )
        qwen_3_embed_8b_config = grox_config.get_embedding_model(
            ModelName.EMBED_LARGE
        )
        recsys_v4_embed_config = grox_config.get_embedding_model(
            ModelName.RECSYS_EMBED_V4
        )
        qwen_3_embed_06b_config.text_max_len = 4096
        qwen_3_embed_8b_config.text_max_len = 4096
        recsys_v4_embed_config.text_max_len = 4096

Five embedding models pre-configured:

  • EMBED_PRIMARY — general-purpose, 8192-token max.
  • EMBED_PRIMARY_VIDEO — variant tuned for posts with video, 8192-token max.
  • EMBED_SMALL (Qwen3 0.6B) — small text-only encoder, 4096-token max.
  • EMBED_LARGE (Qwen3 8B) — large text-only encoder, 4096-token max.
  • RECSYS_EMBED_V4 — the production rec-sys embedding, 4096-token max.

All five clients are created up-front. The _get_client method below selects one at embed time.

        if custom_endpoint:
            custom_embed_config = EmbeddingModelConfig(
                model_name="custom", endpoint=custom_endpoint, text_max_len=4096
            )
            self._custom_embed_client = XaiEmbeddingClient(config=custom_embed_config)
        self.use_custom_embed = True if custom_endpoint else False

If a custom endpoint URL is passed (used for A/B testing new endpoints), create a sixth client pointing there.

        self.renderer_version = renderer_version
        self._client = XaiEmbeddingClient(config=embed_config)
        self._video_client = XaiEmbeddingClient(config=video_embed_config)
        self._qwen_3_embed_06b_client = XaiEmbeddingClient(
            config=qwen_3_embed_06b_config
        )
        self._qwen_3_embed_8b_client = XaiEmbeddingClient(config=qwen_3_embed_8b_config)
        self._recsys_v4_embed_client = XaiEmbeddingClient(config=recsys_v4_embed_config)
        self.model = model
        self.use_grok_summary = use_grok_summary
        self.use_media_descriptions = use_media_descriptions
        self.use_grok_summary_versioned = False
        self.instruction = instruction
        self.use_post_context_summary = use_post_context_summary

        if use_grok_summary_path:
            assert os.path.exists(use_grok_summary_path), (
                f"Grok summary path {use_grok_summary_path} does not exist"
            )
            assert use_grok_summary_path.endswith(".jsonl"), (
                f"Grok summary path {use_grok_summary_path} is not a jsonl file"
            )
            self.grok_summary_versioned: dict[str, str] = {}
            self.use_grok_summary_versioned = True
            with open(use_grok_summary_path, "r") as f:
                for line in f:
                    json_line = json.loads(line)
                    self.grok_summary_versioned[str(json_line["post_id"]).strip()] = (
                        json_line["summary"]
                    )

use_grok_summary_path loads an offline-precomputed .jsonl of {post_id, summary} pairs into a dict. Used for eval and backfill runs: instead of generating summaries live via the slower VLM path, you precompute them once and pin them. Critical for reproducible A/B comparisons.

The eager assert validation means this class fails fast at construction if you typo the path — much better than discovering it during the first embed call.

Client selection

    def _get_client(
        self, num_text: int, num_image: int, num_video: int
    ) -> XaiEmbeddingClient:
        if self.use_custom_embed:
            return self._custom_embed_client
        if self.model == "qwen3":
            return self._qwen_3_embed_06b_client
        if self.model == "qwen3_8b":
            return self._qwen_3_embed_8b_client
        if self.model == "v4":
            return self._recsys_v4_embed_client

        if num_video > 0:
            logger.info(
                f"Using video client for post with {num_text} text, {num_image} images, and {num_video} videos"
            )
            return self._video_client
        return self._client

Resolution priority:

  1. Custom endpoint beats everything (the A/B-test path).
  2. Explicit model beats heuristics — if the constructor was given model="qwen3", always use that.
  3. Heuristic fallback — if no model was specified, use the video-tuned model when there's video, otherwise the primary.

Used in production: model="v4" is the rec-sys feature path; the others are research / benchmarking paths.

Document construction — V0 (original)

    def document_original(
        self, content: list[Content]
    ) -> tuple[list[tuple[str, str | bytes]], int, int, int]:
        def get_convo_video_instruction(video: ConvoVideo) -> str:
            res = [f"The video lasts for {video.total_duration:.2f} seconds."]
            bucket_times = [i * video.duration for i in range(len(video.frames))]
            res.append(
                f"The following frames are sampled at every {video.duration:.2f} second interval."
            )
            for i, frame in enumerate(video.frames):
                subtitle = (
                    video.subtitles[i]
                    if video.subtitles and i < len(video.subtitles)
                    else None
                )
                subtitle_str = (
                    f"with subtitle: {subtitle}" if subtitle else "(no subtitles)"
                )
                res.append(f"At {bucket_times[i]:.2f} seconds, {subtitle_str}.")
            res.append("The frames are listed below:")
            return " ".join(res)

A document is a list[("text"|"image"|"video", str|bytes)]. The original layout: build one combined text section that describes the video metadata + per-frame subtitles, then drop the raw video bytes.

This is the "narrate the video in text, then attach the video for the visual encoder" approach — gives the embedding model both a textual summary of timing/subtitles and the raw frames to attend to.

        document = []
        num_text = 0
        num_image = 0
        num_video = 0
        new_text_part = ""
        for c in content:
            if isinstance(c, ConvoImage):
                document.append(("text", f"Image: \n"))
                document.append(("image", c.content))
                num_image += 1
            elif isinstance(c, ConvoVideo):
                if c.combined_video_bytes:
                    new_text_part += get_convo_video_instruction(c)
                    document.append(("video", c.combined_video_bytes))
                    num_video += 1
            elif isinstance(c, str):
                new_text_part += c
                num_text += 1
        new_text_part = new_text_part.strip()
        document.append(
            ("text", "")
        )
        document.append(("text", new_text_part))
        return document, num_text, num_image, num_video

For each content piece:

  • Image → emit "Image: \n" + the image bytes (so the model knows what follows is an image).
  • Video → append the video-instruction text to a running buffer, emit the raw bytes.
  • Text → append to the running buffer.

At the end: emit an empty text marker (separator?) then the accumulated text. The empty ("text", "") is unusual and looks like a stray — probably no functional effect but quirky.

Document construction — V1 (current)

    def document_v1(
        self, content: list[Content]
    ) -> tuple[list[tuple[str, str | bytes]], int, int, int]:
        def video_frames(
            video: ConvoVideo, index: int
        ) -> list[tuple[str, str | bytes]]:
            res: list[tuple[str, str | bytes]] = []
            for i, frame in enumerate(video.frames):
                res.append(("image", frame))
                if (
                    video.subtitles
                    and i < len(video.subtitles)
                    and video.subtitles[i] is not None
                    and video.subtitles[i].strip() != ""
                ):
                    res.append(("text", "subtitle: " + video.subtitles[i] + " "))
            return res

        document = []
        num_text = 0
        num_image = 0
        num_video = 0

        for c in content:
            if isinstance(c, str):
                document.append(("text", c.strip()))
                num_text += 1
            else:
                if isinstance(c, ConvoImage):
                    document.append(("image", c.content))
                    num_image += 1
                elif isinstance(c, ConvoVideo):
                    document.extend(video_frames(c, num_video))
                    num_video += 1

        return document, num_text, num_image, num_video

V1 is flatter: video → emit each frame as a separate image, with the subtitle text inlined right after each frame. No combined narration text, no raw video bytes.

This works better for image-only multimodal encoders (which can't process actual video bytes) but loses the timing/duration context. The trade-off is empirically resolved in V5 (next file) which adds back transcript-via-ASR.

Embedding API

    async def _create_embeddings_for_post(
        self,
        content: list[Content],
        is_query: bool = False,
        document_version: str = "v1",
    ) -> tuple[list[tuple[str, str | bytes]], np.ndarray]:
        if document_version == "default":
            document_fn = self.document_original
        elif document_version == "v1":
            document_fn = self.document_v1
        else:
            raise ValueError(f"document_version not found: {document_version}")

        document, num_text, num_image, num_video = document_fn(content)
        logger.info(
            f"creating embeddings for post with {num_text} text, {num_image} images, and {num_video} videos"
        )
        client = self._get_client(num_text, num_image, num_video)
        return document, await client.create_embeddings_async(
            [document], is_query=is_query
        )

Pick a document layout, build it, log counts, pick a client (passing counts so the heuristic can fire), call the client. is_query toggles asymmetric query vs document embedding (some models embed queries differently from documents — see Qwen3-embed).

    async def hydrate_grok_post_summary(self, post: Post):
        query = StratoContentUnderstandingUnifiedPostAnnotations()
        res = await query.fetch(int(post.id))
        if res:
            description = res["annotations"]["description"]
            post.summary = description

Fetch the pre-computed Grok summary from Strato (Manhattan KV). Strato shape: {annotations: {description: "..."}}. The description is set on the post in-place.

    def get_detailed_instruct(self, instruction: str) -> str:
        return f"Instruct: {instruction}\nQuery: Please embed the following post:"

The Qwen3-embed-style instruction template: prepend Instruct: ...\nQuery: ... to the embedding input so the model knows what kind of similarity to embed for. (Some embedding models support task-conditioned embeddings; Qwen3 does.)

    def _get_document_fn(self, document_version: str):
        if document_version == "default":
            return self.document_original
        if document_version == "v1":
            return self.document_v1
        raise ValueError(f"document_version not found: {document_version}")

    async def embed_texts_batch(
        self, texts: list[str], is_query: bool = True, document_version: str = "v1"
    ) -> list[list[float]]:
        if not texts:
            return []
        document_fn = self._get_document_fn(document_version)
        documents: list[list[tuple[str, str | bytes]]] = []
        for text in texts:
            document, _, _, _ = document_fn([text])
            documents.append(document)
        client = self._get_client(num_text=1, num_image=0, num_video=0)
        embeddings = await client.create_embeddings_async(documents, is_query=is_query)
        return [embedding.flatten().tolist() for embedding in embeddings]

embed_texts_batch is a separate fast path: a batch of plain strings, no media. Used in eval scripts that embed query strings against a corpus.

The main embed method

    async def embed(
        self, post: Post, is_query: bool = False, document_version: str = "v1"
    ) -> tuple[list[tuple[str, str | bytes]], list[float]]:
        if self.instruction:
            content: list[Content] = [self.get_detailed_instruct(self.instruction)]

        if self.renderer_version == "lite":
            content = LitePostRenderer.render_for_embedding(post)
        elif self.renderer_version == "eval":
            content = EvalPostRenderer.render_for_embedding(post)
        elif self.renderer_version == "mmembed_summary":
            content = await MMEmbedPostRenderer.render_for_embedding(
                post, use_grok_summary=self.use_grok_summary
            )

Three renderer choices:

  • lite — minimal: just the post text, no fancy formatting.
  • eval — eval mode (specific format for benchmarking).
  • mmembed_summary (default) — full multimodal rendering with optional Grok summary inclusion.

Bug alert: if self.instruction is truthy, content gets set to [get_detailed_instruct(...)]. But the immediate if self.renderer_version == "lite": content = ... overwrites it. So the instruction prefix is lost. Could be intentional (the renderer is supposed to know about the instruction) or a left-over from refactoring.

        if self.use_grok_summary_versioned:
            if str(post.id) in self.grok_summary_versioned:
                content.append(
                    f"\nPost summary and description: {self.grok_summary_versioned[str(post.id)]}"
                )

        if self.use_grok_summary and not self.use_grok_summary_versioned:
            await self.hydrate_grok_post_summary(post)
            content.append(f"\nPost summary and description: {post.summary}")

        if self.use_post_context_summary:
            content.append(f"\nPost summary and description: {post.summary}")

        if self.use_media_descriptions:
            await MediaDescriptionLoader.hydrate_media_descriptions(post)
            content.append(
                f"\nThe post has these associated media descriptions: \n{post.media_descriptions}"
            )

Optional context injection — each flag adds another text chunk to the document:

  • Versioned summary (from the jsonl file).
  • Live Grok summary fetch from Strato (only if not versioned, to avoid duplication).
  • Pre-loaded post-context summary (already on the Post object).
  • Media descriptions (alt-text or image-captioning results).
        start_time = time.perf_counter_ns()

        document, embedding = await self._create_embeddings_for_post(
            content, is_query, document_version
        )

        duration_ms = (time.perf_counter_ns() - start_time) / 1_000_000
        logger.info(f"Embedding finished in {duration_ms:.2f} ms")
        Metrics.histogram("post_embedding_duration_ms").record(duration_ms)
        return document, embedding.flatten().tolist()

Time the actual embed call, record duration, return both the document (for debugging) and the flat float list (for downstream consumption).


embedder/multimodal_post_embedder_v5.py (120 lines)

The newer V5 embedder. Much simpler — different client (XaiEmbeddingClientHttp instead of XaiEmbeddingClient), single endpoint, no document-version choices.

import logging
import time

import numpy as np
from embed.embed_http import ChatTemplate, XaiEmbeddingClientHttp
from embed.embed_http import EmbeddingModelConfig as HttpModelConfig
from grox.config.config import ModelName, grox_config
from grox.data_loaders.data_types import Post, Video
from grox.lm.post_v5 import V5EmbedPostRenderer
from monitor.metrics import Metrics

logger = logging.getLogger(__name__)

DEFAULT_SYSTEM_PROMPT = ""
TRUNCATE_DIM = 1024

TRUNCATE_DIM = 1024 — the V5 model produces longer embeddings (probably 2048 or 4096-d), and we truncate + re-normalize to 1024 dims to match the downstream rec-sys schema.

class MultimodalPostEmbedderV5:
    @staticmethod
    def has_video(post: Post) -> bool:
        if post.media:
            for m in post.media:
                if isinstance(m, Video):
                    return True
        return False

    def __init__(
        self,
        system_prompt: str = DEFAULT_SYSTEM_PROMPT,
        max_images: int | None = None,
    ):
        self.truncate_dim = TRUNCATE_DIM
        self.max_images = max_images
        embed_config = grox_config.get_embedding_model(ModelName.RECSYS_EMBED_V5)
        http_config = HttpModelConfig(
            model_name=embed_config.model_name,
            endpoint=embed_config.endpoint,
            text_max_len=4096,
            timeout_seconds=60.0,
        )
        chat_template = ChatTemplate(system_prompt=system_prompt)
        self._client = XaiEmbeddingClientHttp(
            config=http_config, chat_template=chat_template
        )

One client, one config. RECSYS_EMBED_V5 is the production V5 endpoint. Uses a ChatTemplate — V5 takes input as a chat conversation (not just raw text/images), reflecting that it's an LLM-trained-as-embedder rather than a dedicated encoder.

    def _maybe_truncate(self, embedding: np.ndarray) -> list[float]:
        if self.truncate_dim > 0 and len(embedding) > self.truncate_dim:
            emb = embedding[: self.truncate_dim]
            norm = np.linalg.norm(emb)
            if norm > 0:
                emb = emb / norm
            return emb.tolist()
        norm = np.linalg.norm(embedding)
        if norm > 0:
            embedding = embedding / norm
        return embedding.tolist()

Truncate to first 1024 dims, then re-normalize to unit-length. Truncation breaks unit-norm; re-normalize so cosine similarity works downstream.

The if norm > 0 guard is paranoia — if you ever got a zero vector, dividing by zero would NaN out. Probably never happens in practice but cheap insurance.

    async def embed(
        self,
        post: Post,
        transcript: str | None = None,
        is_query: bool = False,
        **kwargs,
    ) -> tuple[list[tuple[str, str | bytes]], list[float]]:
        total_start = time.perf_counter()

        render_start = time.perf_counter()
        text_with_pads, images = V5EmbedPostRenderer.render_for_embedding(
            post, max_images=self.max_images
        )
        render_duration_ms = (time.perf_counter() - render_start) * 1000
        Metrics.histogram("post_embedding_v5.render_duration_ms").record(
            render_duration_ms
        )

V5EmbedPostRenderer.render_for_embedding returns (text_with_pads, images). text_with_pads is a string with image-placeholder tokens at positions where images should be inserted — the embedding client's encode_with_embedded_pads_async interleaves them.

        if transcript:
            text_with_pads += f"\nTranscript: {transcript}"

        document: list[tuple[str, str | bytes]] = [("text", text_with_pads)]
        for img in images:
            document.append(("image", img))

If we have an ASR transcript (from Session 19's ASRProcessor), append it to the text. This is the V5 design point: instead of summarizing first, just dump the transcript at the end and let the model attend to it.

The document list is built for diagnostic logging — the actual embed call only uses text_with_pads and images.

        if not text_with_pads and not images:
            logger.warning(f"Post {post.id} has no text or media content")
            return document, []

        encode_start = time.perf_counter()
        embedding = await self._client.encode_with_embedded_pads_async(
            text_with_pads, images if images else None
        )
        encode_duration_ms = (time.perf_counter() - encode_start) * 1000
        Metrics.histogram("post_embedding_v5.encode_duration_ms").record(
            encode_duration_ms
        )

        truncate_start = time.perf_counter()
        result = self._maybe_truncate(embedding)
        truncate_duration_ms = (time.perf_counter() - truncate_start) * 1000
        Metrics.histogram("post_embedding_v5.truncate_duration_ms").record(
            truncate_duration_ms
        )

        total_duration_ms = (time.perf_counter() - total_start) * 1000
        Metrics.histogram("post_embedding_v5.total_duration_ms").record(
            total_duration_ms
        )
        Metrics.counter("post_embedding_v5.image_count").add(len(images))

        total_image_bytes = sum(len(img) for img in images) if images else 0
        Metrics.histogram("post_embedding_v5.image_payload_bytes").record(
            total_image_bytes
        )

        logger.info(
            f"Embedding V5 post={post.id}: total={total_duration_ms:.1f}ms "
            f"(render={render_duration_ms:.1f}ms, encode={encode_duration_ms:.1f}ms, truncate={truncate_duration_ms:.1f}ms), "
            f"images={len(images)}, image_bytes={total_image_bytes:,}, text_len={len(text_with_pads)}, has_transcript={transcript is not None}"
        )

        return document, result

Three timing sections (render / encode / truncate), each recorded as a histogram. Also image_count counter and image_payload_bytes histogram for bandwidth tracking. Single info log at the end with the full latency breakdown — easy to grep when debugging.

Empty-post check returns (document, []) — the empty list signals downstream "no embedding produced." Plans treat this as a failure.

The contrast between V2 (287 lines, 5 clients, multiple renderers, optional summary hydration) and V5 (120 lines, 1 client, 1 renderer, transcript only) is striking. V5 is the result of architectural consolidation — once you have a strong enough multimodal LLM you don't need all the per-modality preprocessing.


summarizer/summarizer.py (44 lines) and eapi_summarizer.py (44 lines)

Twin abstract bases — one for VLM-backed summarizers, one for EAPI-backed.

import logging
import time
import traceback

from abc import ABC, abstractmethod
from grok_sampler.vision_sampler import VisionSampler
from grox.config.config import ModelConfig
from monitor.metrics import Metrics

logger = logging.getLogger(__name__)
from typing import TypeVar, Generic, Any

T = TypeVar("T")


class Summarizer(ABC, Generic[T]):
    def __init__(self, model_config: ModelConfig, vlm: VisionSampler):
        self.model_config = model_config
        self.vlm = vlm

    async def summarize(self, input: T) -> Any:
        logger.info(f"[{self.__class__.__name__}] started processing summarize request")
        Metrics.counter("summarize.request.count").add(1)

        start = time.perf_counter()
        try:
            res = await self._summarize(input)
        except Exception:
            Metrics.counter("summarize.error.count").add(1)
            logger.error(
                f"[{self.__class__.__name__}] error processing summarize request: {traceback.format_exc()}"
            )
            raise
        Metrics.counter("summarize.success.count").add(1)
        end = time.perf_counter()
        logger.info(
            f"[{self.__class__.__name__}] finished processing summarize request in {end - start:.2f} seconds"
        )
        Metrics.histogram("summarize.latency.seconds").record(end - start)
        return res

    @abstractmethod
    async def _summarize(self, input: T) -> Any:
        pass

A generic-typed (T = input type) abstract base. The summarize() method wraps _summarize() with metrics and logging — request/success/error counters, latency histogram.

EapiSummarizer is the exact same structure but takes EapiSampler/EapiModelConfig instead of VisionSampler/ModelConfig. These two classes don't share a base for the metric path — slight DRY violation, but the model client types differ enough (VLM has image input, EAPI is text-only) that a common base wouldn't add much.


summarizer/post_embedding_summarizer.py (55 lines)

The concrete summarizer used by the V3 embedder pipeline (Session 19's task_post_embedding_summarizer).

import uuid
import logging

from grox.lm.post import PostRenderer
from grox.lm.user import UserRenderer
from grox.config.config import grox_config, ModelName
from grox.summarizer.summarizer import Summarizer
from grox.data_loaders.data_types import Post
from grox.lm.convo import Conversation, Message, Role
from grok_sampler.config import GrokModelConfig
from grok_sampler.vision_sampler import VisionSampler
import os

logger = logging.getLogger(__name__)


class PostEmbeddingSummarizer(Summarizer):
    def __init__(self, prompt_file: str):
        vlm_config = grox_config.get_model(ModelName.VLM_MINI_CRITICAL)
        vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))
        self.prompt_file: str = prompt_file
        if not os.path.exists(self.prompt_file):
            raise FileNotFoundError(f"Prompt file {self.prompt_file} not found")
        super().__init__(vlm_config, vlm)

Uses VLM_MINI_CRITICAL — the "mini" tier (smaller, cheaper VLM) but the "critical" deployment (higher availability, lower latency variance). Good fit for summarization which is high-volume + simple.

The prompt is loaded from a file path passed at construction. Validating-at-construction again — fail fast if the operator points at a non-existent prompt file.

    async def _summarize(self, post: Post) -> str:
        convo = await self._render_vlm_conversation(post)
        result = await self.vlm.sample(
            convo.interleave(), conversation_id=convo.conversation_id
        )
        result_section = result.split("<description>")[1].split("</description>")[0]
        return result_section

The system prompt instructs the VLM to wrap the summary in <description>...</description> tags. The parser splits twice to extract the contents. No fallback — if the tags are missing, this crashes with IndexError. The caller catches and counts the error.

    async def _render_vlm_conversation(
        self, post: Post, disable_thinking: bool = True
    ) -> Conversation:
        convo = Conversation(conversation_id=uuid.uuid4().hex)
        prompt = ""
        with open(self.prompt_file, "r") as f:
            prompt = f.read()
        convo.messages.append(Message(role=Role.SYSTEM, content=[prompt]))
        convo.messages.append(await self._build_task_message(post))
        if disable_thinking:
            convo.messages.append(
                Message(role=Role.ASSISTANT, content=[""], separator="")
            )
        else:
            convo.messages.append(Message(role=Role.ASSISTANT))
        return convo

    async def _build_task_message(self, post: Post) -> Message:
        msg: Message = Message(role=Role.USER, content=[])
        msg.content.extend(UserRenderer.render(post.user))
        msg.content.extend(PostRenderer.render(post))
        return msg

Three-message conversation: SYSTEM (file prompt), USER (rendered post), ASSISTANT (empty prefix).

The empty assistant message with separator="" is the "force the model to start generating right at this position" trick — it's the assistant role with no content and no separator, so the model picks up the EOT marker and starts emitting the response immediately. The disable_thinking=True default means we skip Grok's chain-of-thought mode (faster, cheaper, fine for short summaries).

_build_task_message — render User+Post into the user message. UserRenderer.render returns a list of content parts (text describing author info); PostRenderer.render returns the post body, media, etc.


classifiers/content/classifier.py (98 lines)

The abstract ContentClassifier base — all five concrete classifiers inherit from this.

import logging
import time
import traceback

from abc import ABC, abstractmethod
from grok_sampler.llm import LiteLLM
from grox.data_loaders.data_types import (
    ContentCategoryResult,
    ContentCategoryType,
    Post,
)
from grox.lm.convo import Conversation
from monitor.metrics import Metrics
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class ContentClassifier(ABC):
    def __init__(self, categories: list[ContentCategoryType], llm: LiteLLM):
        self.categories = categories
        self.llm = llm

    @property
    def model_name(self) -> str:
        return self.llm.model_config.model_name

Each classifier declares which ContentCategoryTypes it produces (some produce 1, some produce 2). The llm is a LiteLLM (which is the type-hint — concrete impls pass a VisionSampler which presumably duck-types).

    async def classify(self, post: Post) -> list[ContentCategoryResult]:
        logger.info(
            f"[{self.__class__.__name__}] started processing content classify request: {post.id}"
        )
        for category in self.categories:
            Metrics.counter(f"content.classification.request.count").add(
                1, attributes={"category": category.value.lower()}
            )
        for category in self.categories:
            Metrics.counter("content.classification.intake.count").add(
                1, attributes={"category": category.value.lower()}
            )
        start = time.perf_counter()
        try:
            res = await self._classify(post)
        except Exception:
            for category in self.categories:
                Metrics.counter(f"content.classification.error.count").add(
                    1, attributes={"category": category.value.lower()}
                )
            logger.error(
                f"[{self.__class__.__name__}] error processing content classify request: {post.id} {traceback.format_exc()}"
            )
            raise
        for category in self.categories:
            Metrics.counter(f"content.classification.success.count").add(
                1, attributes={"category": category.value.lower()}
            )
        end = time.perf_counter()
        logger.info(
            f"[{self.__class__.__name__}] finished processing content classify request: {post.id} in {end - start:.2f} seconds"
        )
        Metrics.histogram(f"content.classification.latency").record(
            end - start, attributes={"class": self.__class__.__name__}
        )
        self._post_process_for_logging(res, start, end)
        return res

Public entrypoint. Counters are emitted per category so dashboards can break down by category. The two intake counters (request.count and intake.count) seem redundant — different downstream consumers may rely on one or the other.

Latency histogram has both class attribute (from this method) and category attribute (added later in _post_process_for_logging).

    def _post_process_for_logging(
        self, res: list[BaseModel], start_time, end_time
    ) -> None:
        for category in self.categories:
            Metrics.histogram(f"content.classification.latency").record(
                end_time - start_time, attributes={"category": category.value.lower()}
            )

        for result in res:
            Metrics.counter(f"content.classification.result.count").add(
                1,
                attributes={
                    "category": result.category.value.lower(),
                    "positive": str(result.positive),
                },
            )

Per-category latency record (yes, the histogram gets a record once per category — duplicates), plus a per-result result.count counter with the positive: bool attribute. The latter is the positivity rate dashboard fuel.

    @abstractmethod
    async def _to_convo(self, post: Post) -> Conversation:
        pass

    @abstractmethod
    async def _sample(self, convo: Conversation) -> str:
        pass

    @abstractmethod
    async def _parse(self, post: Post, output: str) -> list[ContentCategoryResult]:
        pass

    async def _classify(self, post: Post) -> list[ContentCategoryResult]:
        convo = await self._to_convo(post)
        output = await self._sample(convo)
        return await self._parse(post, output)

Three template-method hooks: convo-build → sample → parse. The default _classify chains them; concrete classes can override _classify if they need richer logic (like the spam classifier filtering categories).

This is a clean template-method pattern — every classifier follows the same shape, so the framework can do common things (metrics, logging, error counting) and the subclass only fills in the model-specific bits.


classifiers/content/spam.py (104 lines) — SpamEapiLowFollowerClassifier

import logging
import re
import uuid
from pydantic import ValidationError

from grox.lm.convo import Role, Message, Conversation
from grox.lm.thread import ThreadRenderer
from grox.config.config import ModelName, grox_config
from grok_sampler.config import GrokModelConfig
from grox.prompts.template import SpamSystemLowFollower
from grok_sampler.vision_sampler import VisionSampler
from grox.data_loaders.data_types import (
    Post,
    ContentCategoryType,
    ContentCategoryResult,
    SpamSampleResult,
)
from grox.classifiers.content.classifier import ContentClassifier
from grox.data_loaders.strato_loader import TweetStratoLoader
from grox.data_loaders.media_loader import MediaLoader

logger = logging.getLogger(__name__)


class SpamEapiLowFollowerClassifier(ContentClassifier):
    def __init__(self, model_name: ModelName = ModelName.VLM_PRIMARY):
        vlm_config = grox_config.get_model(model_name)
        vlm_config.temperature = 0.000001
        vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))
        super().__init__(categories=[ContentCategoryType.SPAM_COMMENT], llm=vlm)

Despite the class name "Eapi", it actually uses a VisionSampler (the LLM API path), not the EAPI path. The naming is historical.

temperature = 0.000001 — effectively deterministic (you can't pass exactly 0 to many samplers, so a tiny positive epsilon gives near-greedy decoding). All classifiers in this file use this trick.

Categories: just SPAM_COMMENT. One in, one out.

    @property
    def model_name(self) -> str:
        return "grox"

Override the base — for some reason the spam classifier reports its model_name as the constant string "grox" rather than the actual sampler model. Probably for backwards-compat with a downstream dashboard.

    async def _classify(self, post: Post) -> list[ContentCategoryResult]:
        convo = await self._to_convo(post)
        result = await self._sample(convo)
        parsed = await self._parse(post, result)
        filtered_parsed = [
            res for res in parsed if res.category == ContentCategoryType.SPAM_COMMENT
        ]
        assert len(filtered_parsed) == 1
        return filtered_parsed

Custom _classify override that adds assert-only-one-spam-result. Defensive: the parser is supposed to produce exactly one result with category=SPAM_COMMENT, and if it didn't we crash here. Counts as a programming error, not a model error.

    async def _to_convo(self, post: Post) -> Conversation:
        convo = Conversation(conversation_id=uuid.uuid4().hex)
        convo.messages.append(
            Message(role=Role.SYSTEM, content=[SpamSystemLowFollower().render()])
        )
        convo.messages.append(
            ThreadRenderer.render(post, role=Role.HUMAN, separator="\n\n")
        )
        return convo

Two-message convo: SYSTEM with the spam-low-follower prompt template, HUMAN with the rendered thread (post + its parent context).

Note Role.HUMAN here vs Role.USER elsewhere. Different enum values — HUMAN is the conversational-tuned mode, USER is the more general one. Probably mapped to the same token sequence in practice.

    async def _sample(self, convo: Conversation) -> str:
        return await self.llm.sample(
            convo.interleave(), conversation_id=convo.conversation_id
        )

Simple sample.

    async def _clean_output(self, output: str) -> str:
        if output.endswith("<|eos|>"):
            output = output.removesuffix("<|eos|>")
        output = output.strip()
        if output.startswith("```json"):
            output = output[7:]
        elif output.startswith("```"):
            output = output[3:]
        if output.endswith("```"):
            output = output[:-3]
        output = output.strip()
        return output

Generic LLM-output cleanup: strip end-of-sequence marker, strip code-fence ```json ... ``` wrappers. Used in spam and reply-ranking.

    async def _parse(self, post: Post, output: str) -> list[ContentCategoryResult]:
        decision = None
        summary = ""

        cleaned_result = await self._clean_output(output)
        try:
            result = SpamSampleResult.model_validate_json(cleaned_result)
            decision = result.decision
            summary = result.reason
        except ValidationError:
            match = re.search(r'"decision":\s*"(.*?)"', cleaned_result)
            if match:
                decision = match.group(1).strip()

        if not decision:
            raise ValueError(f"Invalid output format: {output}")

        is_spam = decision == "spam"
        score = 1.0 if is_spam else 0.0

        if is_spam:
            logger.info(f"Spam found for low follower user: {post.id}")

        return [
            ContentCategoryResult(
                category=ContentCategoryType.SPAM_COMMENT,
                positive=is_spam,
                score=score,
                summary=summary,
            )
        ]

Two-tier parsing:

  1. Happy path: validate against the SpamSampleResult Pydantic schema. Returns a structured object with decision and reason fields.
  2. Fallback: if Pydantic validation fails, regex out just the "decision": "..." string. Skip the reason (it's optional).
  3. If we still don't have a decision, throw — there's no salvaging this output.

Score is binary: 1.0 for spam, 0.0 for not spam. The spam classifier produces a discrete decision, not a graded score. The downstream consumer just thresholds on positive.

if is_spam: logger.info(f"Spam found...") — an info log for every spam detection. Useful in production for sampling.


classifiers/content/banger_initial_screen.py (160 lines)

The most feature-rich classifier — produces 7 fields beyond the basic positive/score.

class BangerInitialScreenResult(BaseModel):
    quality_score: float
    description: str
    tags: list[str]
    taxonomy_categories: list[dict] | None = None
    tweet_bool_metadata: TweetBoolMetadata | None = None
    is_image_editable_by_grok: bool | None = None
    slop_score: int | None = None
    has_minor_score: float | None = None

Each field:

  • quality_score — main banger score (0.0 to 1.0).
  • description — a natural-language description of why this post qualifies as a banger.
  • tags — list of topic tags (e.g., ["sports", "viral"]).
  • taxonomy_categories — fine-grained categories from a topic taxonomy.
  • tweet_bool_metadata — booleans like contains_political_content, contains_adult_content, etc.
  • is_image_editable_by_grok — whether Grok could edit/modify the image (a UI feature flag).
  • slop_score — AI-slop score (is this post likely AI-generated low-quality content?).
  • has_minor_score — does the image/video appear to contain minors (children).

The "minor score" alone shows the safety-aware design — even the engagement-prediction classifier surfaces a child-safety signal as a byproduct.

class BangerInitialScreenClassifier(ContentClassifier):
    result_pattern = re.compile(r"(.*)<json>(.*)</json>", re.DOTALL)

    def __init__(self):
        vlm_config = grox_config.get_model(ModelName.VLM_PRIMARY)
        vlm_config.temperature = 0.000001
        vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))
        super().__init__(
            categories=[
                ContentCategoryType.BANGER_INITIAL_SCREEN,
                ContentCategoryType.GROK_RANKER,
            ],
            llm=vlm,
        )
        self._topics = None

Uses VLM_PRIMARY (the full-size VLM — bangers are high-value, can afford the bigger model). Two categories returned: BANGER_INITIAL_SCREEN (this banger-screen classification) and GROK_RANKER (the same output is also published as a Grok-ranker annotation).

The result_pattern = re.compile(r"(.*)<json>(.*)</json>", re.DOTALL) is the canonical Grox JSON-output regex — model is expected to emit <reasoning text> <json>{...}</json>, and we split on the json tags.

    @staticmethod
    def build_convo(post: Post, topics: list | None = None) -> Conversation:
        convo = Conversation(conversation_id=uuid.uuid4().hex)
        convo.messages.append(
            Message(
                role=Role.SYSTEM,
                content=[BangerMiniVlmScreenScore().render(params={"topics": topics})],
            )
        )

        user_msg = Message(role=Role.USER, content=[])
        user_msg.content.extend(UserRenderer.render(post.user))
        user_msg.content.extend(PostRenderer.render(post))
        user_msg.content.append(
            f"\n\nAnalyze the post {post.id} and provide the requested JSON object for the post."
        )
        convo.messages.append(user_msg)

        convo.messages.append(Message(role=Role.ASSISTANT, content=[""], separator=""))
        return convo

Standard three-message build. System prompt templated with topics — a list of canonical topic names so the model can match tags to them. Without this, the model would invent its own tags and joining downstream would be a nightmare.

The user message ends with an explicit instruction: "Analyze the post {post.id} and provide the requested JSON object". Disambiguates from any references to other posts in the thread context.

    async def classify(
        self, post: Post, topics: list | None = None
    ) -> list[ContentCategoryResult]:
        self._topics = topics
        return await super().classify(post)

Override classify to thread the topics list through — stashes it on self._topics for _to_convo to pick up. Not threadsafe across concurrent calls on the same classifier instance, but each plan-task has its own implicit serial usage.

    async def _classify_for_eval(self, post: Post) -> str:
        self._topics = None
        convo = await self._to_convo(post)
        logger.info(f"Banger initial screen conversation for post {post.id}")
        result = await self.llm.sample(
            convo.interleave(), conversation_id=convo.conversation_id
        )
        logger.info(f"Banger initial screen result for post {post.id}: {result}")
        return result

Eval-only entry point: skips the parse and metrics path, returns the raw LLM output. Used by offline eval pipelines that want to compare raw outputs across model versions.

    async def _to_convo(self, post: Post) -> Conversation:
        return self.build_convo(post, topics=self._topics)

    async def _sample(self, convo: Conversation) -> str:
        return await self.llm.sample(
            convo.interleave(), conversation_id=convo.conversation_id
        )

Forwards.

    async def _parse(self, post: Post, output: str) -> list[ContentCategoryResult]:
        match = self.result_pattern.search(output)
        if match:
            reasoning = match.group(1).strip()
            logger.info(
                f"Banger initial screen result reasoning for post {post.id}: {reasoning}"
            )
            result = BangerInitialScreenResult.model_validate_json(
                match.group(2).strip()
            )
            score = result.quality_score
            Metrics.histogram(
                "banger_initial_screen_score",
                explicit_bucket_boundaries_advisory=[
                    0,
                    0.1,
                    0.2,
                    0.3,
                    0.4,
                    0.5,
                    0.6,
                    0.7,
                    0.8,
                    0.9,
                    1,
                ],
            ).record(score)
            banger_initial_positive = score >= 0.4

Match the regex, log the reasoning part (the freeform text before <json>), validate the JSON.

explicit_bucket_boundaries_advisory — fine-grained histogram bins (0.0, 0.1, ..., 1.0). Lets the SRE team see the actual score distribution rather than the default coarse bucketing.

banger_initial_positive = score >= 0.4 — the production threshold for "yes, this is a banger." 0.4 is a fairly liberal bar; this gates a recommendation surface, not a moderation action, so being inclusive is fine.

            taxonomy_categories = []
            if result.taxonomy_categories:
                for tc in result.taxonomy_categories:
                    taxonomy_categories.append(
                        ContentCategoryScore(
                            id=tc["id"],
                            name=tc["name"],
                            score=tc["score"],
                            category_id=None,
                        )
                    )

            return [
                ContentCategoryResult(
                    category=cat,
                    positive=banger_initial_positive,
                    score=score,
                    reason=reasoning,
                    summary=result.description,
                    tags=result.tags,
                    taxonomy_categories=taxonomy_categories,
                    tweet_bool_metadata=result.tweet_bool_metadata,
                    is_image_editable_by_grok=result.is_image_editable_by_grok,
                    slop_score=result.slop_score,
                    has_minor_score=result.has_minor_score,
                )
                for cat in self.categories
            ]
        else:
            raise ValueError(f"Invalid output: {output}")

Transform the model's taxonomy_categories (raw dicts) into Pydantic objects. Then emit one ContentCategoryResult per category (both BANGER_INITIAL_SCREEN and GROK_RANKER get the same data — same score, same description, same tags).

No fallback parsing — if result_pattern doesn't match, we throw and the framework records an error.


classifiers/content/post_safety_screen_deluxe.py (92 lines)

Lightweight follow-up to the banger classifier — runs on already-popular posts to set safety bool flags.

class PostSafetyScreenResult(BaseModel):
    tweet_bool_metadata: TweetBoolMetadata


class PostSafetyDeluxeClassifier(ContentClassifier):
    result_pattern = re.compile(r"(.*)<json>(.*)</json>", re.DOTALL)

    def __init__(self):
        vlm_config = grox_config.get_model(ModelName.VLM_PRIMARY_CRITICAL)
        vlm_config.temperature = 0.000001
        vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))
        super().__init__(
            categories=[
                ContentCategoryType.POST_SAFETY_SCREEN,
            ],
            llm=vlm,
        )

Critical tier of VLM_PRIMARY. Safety is critical even if banger-screen isn't.

    @staticmethod
    def build_convo(post: Post) -> Conversation:
        convo = Conversation(conversation_id=uuid.uuid4().hex)
        convo.messages.append(
            Message(role=Role.SYSTEM, content=[PostSafetyDeluxe().render()])
        )

        user_msg = Message(role=Role.USER, content=[])
        user_msg.content.extend(UserRenderer.render(post.user))
        user_msg.content.extend(PostRenderer.render(post))
        user_msg.content.append(
            f"\n\nAnalyze the post {post.id} and provide the requested JSON object for the post."
        )
        convo.messages.append(user_msg)

        convo.messages.append(Message(role=Role.ASSISTANT, content=[]))
        return convo

Standard build. Note the assistant message has content=[] (no empty-string prefix, no separator override) — keeps the default thinking mode enabled (model can chain-of-thought before emitting JSON).

    async def _parse(self, post: Post, output: str) -> list[ContentCategoryResult]:
        match = self.result_pattern.search(output)
        if match:
            reasoning = match.group(1).strip()
            logger.info(
                f"Post Safety Screen reasoning for post {post.id} : {reasoning}"
            )
            result = PostSafetyScreenResult.model_validate_json(match.group(2).strip())
            logger.info(
                f"Post Safety Screen result for post {post.id} : {result}"
            )
            return [
                ContentCategoryResult(
                    category=ContentCategoryType.POST_SAFETY_SCREEN,
                    positive=False,
                    score=0.0,
                    tweet_bool_metadata=result.tweet_bool_metadata,
                )
            ]
        else:
            raise ValueError(f"Invalid output: {output}")

Note positive=False, score=0.0always. The "positive" field doesn't mean anything for this classifier; the actual signal is inside tweet_bool_metadata (which has fields like contains_violence, contains_nudity, etc.). Downstream consumers ignore positive/score for this category and read the bool metadata.


classifiers/content/reply_ranking.py (158 lines) — ReplyScorer

Doesn't inherit from ContentClassifier! It's a standalone class for scoring how good a reply is.

class ReplyScorer:
    model_name = "GROK"

    def __init__(self):
        vlm_config = grox_config.get_model(ModelName.VLM_MINI_CRITICAL)
        vlm_config.temperature = 0.000001
        self.vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))

        vlm_fallback_config = grox_config.get_model(
            ModelName.VLM_PRIMARY_CRITICAL
        )
        vlm_fallback_config.temperature = 0.000001
        self.vlm_fallback = VisionSampler(
            GrokModelConfig(**vlm_fallback_config.model_dump())
        )

Two model tiers wired up. Primary uses mini-critical (cheap, fast). Fallback uses primary-critical (more capable, more expensive). The fallback fires when the mini fails to produce valid JSON.

    async def score(self, post: Post) -> list[ReplyScoreResult]:
        convo = await self._to_convo(post)
        result = await self._sample(convo, post)
        parsed = await self._parse(result)
        Metrics.histogram(
            "ranked_replies_scores",
            explicit_bucket_boundaries_advisory=[0.0, 1.0, 2.0, 3.0],
        ).record(parsed[0].score)

        return parsed

Score histogram has bins at 0/1/2/3 — the reply score is on a 0-3 scale, not 0-1 like banger.

    async def _to_convo(self, post: Post, non_reasoning: bool = False) -> Conversation:
        convo = Conversation(conversation_id=uuid.uuid4().hex)
        system_prompt = ReplyScoringSystem().render(
            params={"large_account_follower_threshold": ""}
        )
        if non_reasoning:
            system_prompt = "" + system_prompt
        convo.messages.append(Message(role=Role.SYSTEM, content=[system_prompt]))
        convo.messages.append(
            ThreadRenderer.render(
                post, role=Role.HUMAN, separator="\n\n", include_signals=True
            )
        )
        if non_reasoning:
            convo.messages.append(
                Message(role=Role.ASSISTANT, content=[""], separator="")
            )
        else:
            convo.messages.append(Message(role=Role.ASSISTANT, content=[]))
        return convo

non_reasoning toggle: when True, forces the empty-assistant trick (no thinking), and the system prompt gets a no-op "" prepended (probably a placeholder for a no-thinking instruction in the template).

include_signals=True in ThreadRenderer — pass engagement metrics (likes, replies, etc.) into the rendered thread, so the model can factor them into the reply score.

The large_account_follower_threshold="" — empty string sentinel. The system prompt template references this parameter; passing empty might disable a follower-threshold-based code path.

    async def _sample(self, convo: Conversation, post: Post) -> str:
        output = await self.vlm.sample(
            convo.interleave(),
            conversation_id=convo.conversation_id,
            json_schema=json.dumps(ReplyScoreResult.model_json_schema()),
        )
        match = re.search(r"\{.*\}", output, re.DOTALL)
        if not (match and "score" in match.group(0)):
            fallback_convo = await self._to_convo(post, non_reasoning=True)
            output = await self.vlm_fallback.sample(
                fallback_convo.interleave(),
                conversation_id=fallback_convo.conversation_id,
                json_schema=json.dumps(ReplyScoreResult.model_json_schema()),
            )
        return output

Constrained generation: the sampler is given json_schema=... from the Pydantic model — the sampler uses constrained decoding (logit masking) to ensure the output validates. Cheap correctness — no malformed JSON.

Then the fallback: regex-check for a JSON object containing "score". If absent, rebuild the convo in non-reasoning mode and resample with the more powerful fallback model.

This is essentially "if the mini model failed to follow the format even with constrained decoding, fall back to the big model in fast (non-thinking) mode." Two changes at once: model and mode.

    async def _clean_output(self, output: str) -> str:
        if output.endswith("<|eos|>"):
            output = output.removesuffix("<|eos|>")
        output = output.strip()
        if output.startswith("```json"):
            output = output[7:]
        elif output.startswith("```"):
            output = output[3:]
        if output.endswith("```"):
            output = output[:-3]
        output = output.strip()
        return output

Same as the spam classifier. Duplicated rather than shared via a util — minor DRY violation.

    async def _parse(self, output: str) -> list[ReplyScoreResult]:
        score = None
        reason = ""

        match = re.search(r"\{.*\}", output, re.DOTALL)
        if match and "score" in match.group(0):
            raw_result = match.group(0).strip()
        else:
            raw_result = output

        cleaned_result = await self._clean_output(raw_result)

        try:
            result = ReplyScoreResult.model_validate_json(cleaned_result)
            score = result.score
            reason = result.reason
        except (ValidationError, ValueError):
            try:
                repaired = json_repair.repair_json(cleaned_result, return_objects=True)
                if isinstance(repaired, dict) and "score" in repaired:
                    result = ReplyScoreResult.model_validate(repaired)
                    score = result.score
                    reason = result.reason
                    Metrics.counter("task.reply_ranker.json_repaired.count").add(1)
            except Exception:
                pass

            if score is None:
                score_match = re.search(r'"score":\s*([\d.]+)', cleaned_result)
                if score_match:
                    try:
                        score = float(score_match.group(1).strip())
                    except ValueError:
                        score = None
                        Metrics.counter("task.reply_ranker.invalid.count").add(
                            1,
                            attributes={
                                "filter": "reply_ranking",
                                "reason": "invalid_score_format",
                            },
                        )
            if not reason:
                reason_match = re.search(
                    r'"reason":\s*"((?:[^"\\]|\\.)*)"', cleaned_result, re.DOTALL
                )
                if reason_match:
                    reason = reason_match.group(1)

        if not score and score != 0:
            logger.error(f"Invalid output format: {output}")
            Metrics.counter("task.reply_ranker.invalid.count").add(
                1, attributes={"filter": "reply_ranking", "reason": "invalid_format"}
            )
            raise ValueError(f"Invalid output: {output}")

        return [ReplyScoreResult(score=score, reason=reason)]

The most elaborate parser. Four-tier fallback:

  1. Pydantic validation of cleaned JSON.
  2. json_repair library — fixes common JSON-malformation issues (trailing commas, single quotes, missing braces). Increment json_repaired.count counter so we can monitor repair frequency.
  3. Regex extract of "score": <number> and "reason": "..." from the raw output.
  4. Give up — log error, increment invalid.count, raise.

if not score and score != 0 — the explicit "and score != 0" handles the case where score is literally 0.0 (which is falsy in Python). We want None to fail but 0.0 to succeed.

The escape-aware reason regex "reason":\s*"((?:[^"\\]|\\.)*)" correctly handles escaped quotes within the reason string — small thing, but matters when the reason quotes itself.


classifiers/content/safety_ptos.py (288 lines)

The two-stage safety classifier. Category classification first, then per-category policy classification. This is the heart of Grox's content-moderation pipeline.

Thinking-mode helpers

_THINKING_RESTRICTION_LINES = {
    "",
    "",
}


def _strip_thinking_restrictions(text: str) -> str:
    lines = text.splitlines(keepends=True)
    return "".join(
        line for line in lines if line.strip() not in _THINKING_RESTRICTION_LINES
    ).lstrip("\n")


def _render_safety_ptos_for_reasoning() -> str:
    return _strip_thinking_restrictions(SafetyPtos().render())

_THINKING_RESTRICTION_LINES is a set of two empty strings — likely a redaction in the open-source dump (the actual strings would be lines like "Do not show your reasoning" that get stripped when running in deluxe (thinking) mode).

_strip_thinking_restrictions filters those lines out of a prompt — when running deluxe mode, we want the model to reason, so we remove the "don't reason" lines from the prompt.

Stage 1: Category classifier

class SafetyPtosCategoryClassifier(ContentClassifier):
    result_pattern = re.compile(r"(.*)<json>(.*)</json>", re.DOTALL)

    def __init__(
        self,
        model_name: ModelName = ModelName.VLM_SAFETY,
        deluxe: bool = False,
    ):
        self.deluxe = deluxe
        vlm_config = grox_config.get_model(model_name)
        vlm_config.temperature = 0.000001
        vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))
        super().__init__(categories=[ContentCategoryType.SAFETY_PTOS], llm=vlm)

VLM_SAFETY model tier — a model specifically tuned for safety classification. deluxe flag toggles thinking mode.

    def build_convo(self, post: Post) -> Conversation:
        convo = Conversation(conversation_id=uuid.uuid4().hex)

        if self.deluxe:
            convo.messages.append(
                Message(role=Role.SYSTEM, content=[_render_safety_ptos_for_reasoning()])
            )
        else:
            convo.messages.append(
                Message(role=Role.SYSTEM, content=[SafetyPtos().render()])
            )

Deluxe → reasoning-enabled prompt. Default → as-is prompt with thinking suppressed.

        user_msg = Message(role=Role.USER, content=[])
        user_msg.content.extend(UserRenderer.render(post.user))
        user_msg.content.extend(PostRenderer.render(post, include_reply_to=True))
        user_msg.content.append(
            f"\n\nAnalyze the post {post.id} and provide the requested JSON object for the post."
        )
        convo.messages.append(user_msg)

        if self.deluxe:
            convo.messages.append(Message(role=Role.ASSISTANT, content=[]))
        else:
            convo.messages.append(
                Message(role=Role.ASSISTANT, content=[""], separator="")
            )

        return convo

include_reply_to=True — context matters for replies. A reply that says "yeah, kill it" is different if the parent is about a video game vs about a person.

Assistant message: deluxe = thinking allowed, default = empty-prefix trick to disable thinking.

    async def classify_post(self, post: Post) -> SafetyPostAnnotations:
        convo = await self._to_convo(post)
        result = await self._sample(convo, post)
        mode = "deluxe" if self.deluxe else "standard"
        logger.info(
            f"safety ptos category classifier ({mode}) result for {post.id}: {result}"
        )

        match = self.result_pattern.search(result)
        if match:
            json_str = match.group(2).strip()
            return SafetyPostAnnotations.model_validate_json(json_str)
        else:
            raise ValueError(
                f"Invalid output for safety ptos category classifier ({mode}): {result}"
            )

Separate from _classify — this is the rich-result entry point. Returns the full SafetyPostAnnotations Pydantic object (which contains the list of violated policies). The framework _classify returns only ContentCategoryResult (which loses that detail), so this classify_post method exists for callers that need the policy list.

    async def _to_convo(self, post: Post) -> Conversation:
        return self.build_convo(post)

    async def _sample(self, convo: Conversation, post: Post = None) -> str:
        return await self.llm.sample(
            convo.interleave(), conversation_id=convo.conversation_id
        )

    async def _parse(self, post: Post, output: str) -> List[ContentCategoryResult]:
        match = self.result_pattern.search(output)
        if match:
            return [
                ContentCategoryResult(
                    category=ContentCategoryType.SAFETY_PTOS, positive=True, score=0.0
                )
            ]
        else:
            mode = "deluxe" if self.deluxe else "standard"
            raise ValueError(
                f"Invalid parsing for safety ptos category classifier ({mode}): {output}"
            )

The framework _parse returns a minimal ContentCategoryResult with positive=True if any output was produced. The actual annotations are not parsed here — they're returned via classify_post() instead.

Stage 2: Policy classifier

class SafetyPtosPolicyClassifier(ContentClassifier):
    result_pattern = re.compile(r"(.*)<json>(.*)</json>", re.DOTALL)

    def __init__(self, deluxe: bool = False):
        self.deluxe = deluxe

        vlm_config = grox_config.get_model(ModelName.VLM_PRIMARY_CRITICAL)
        vlm_config.temperature = 0.000001
        vlm = VisionSampler(GrokModelConfig(**vlm_config.model_dump()))
        super().__init__(categories=[ContentCategoryType.SAFETY_PTOS], llm=vlm)

        if deluxe:
            eapi_config_reasoning = grox_config.get_eapi_model(
                ModelName.EAPI_REASONING_INTERNAL
            )
            self.eapi_reasoning = EapiSampler(
                EapiModelConfig(**eapi_config_reasoning.model_dump())
            )

            eapi_config_reasoning_x_algo = grox_config.get_eapi_model(
                ModelName.EAPI_REASONING
            )
            self.eapi_reasoning_x_algo = EapiSampler(
                EapiModelConfig(**eapi_config_reasoning_x_algo.model_dump())
            )

Three samplers:

  • Primary: VLM_PRIMARY_CRITICAL — the base policy-detection model.
  • eapi_reasoning (deluxe only): EAPI_REASONING_INTERNAL — internal reasoning-tuned EAPI model.
  • eapi_reasoning_x_algo (deluxe only): EAPI_REASONING — the external/published reasoning model (Grok 4.2 or whatever the current version is).

In deluxe mode for the most safety-sensitive categories, the 4.2 reasoning model takes over.

Policy → prompt mapping

    @staticmethod
    def _get_policy_prompt(violation: SafetyPtosViolatedPolicy) -> str:
        if violation.category == SafetyPolicyCategory.ViolentMedia:
            return ViolentMediaPolicy().render()
        elif violation.category == SafetyPolicyCategory.AdultContent:
            return AdultContentPolicy().render()
        elif violation.category == SafetyPolicyCategory.Spam:
            return SpamPolicy().render()
        elif violation.category == SafetyPolicyCategory.IllegalAndRegulatedBehaviors:
            return IllegalAndRegulatedBehaviorsPolicy().render()
        elif violation.category == SafetyPolicyCategory.HateOrAbuse:
            return HateOrAbusePolicy().render()
        elif violation.category == SafetyPolicyCategory.ViolentSpeech:
            return ViolentSpeechPolicy().render()
        elif violation.category == SafetyPolicyCategory.SuicideOrSelfHarm:
            return SuicideOrSelfHarmPolicy().render()
        else:
            raise ValueError(
                f"No policy prompt available for category: {violation.category.value}"
            )

Seven category → prompt mappings. Each *Policy is a separate template — the model is shown only the policy-specific prompt for the category that the stage-1 classifier flagged. Smaller prompt = faster inference + less context for the model to ignore.

    def build_convo(
        self, post: Post, violation: SafetyPtosViolatedPolicy
    ) -> Conversation:
        content = self._get_policy_prompt(violation)
        if self.deluxe:
            content = _strip_thinking_restrictions(content)

        convo = Conversation(conversation_id=uuid.uuid4().hex)
        convo.messages.append(Message(role=Role.SYSTEM, content=[content]))

        user_msg = Message(role=Role.USER, content=[])
        user_msg.content.extend(UserRenderer.render(post.user))
        user_msg.content.extend(PostRenderer.render(post, include_reply_to=True))
        user_msg.content.append(
            f"\n\nAnalyze the post {post.id} for the specific safety policy violation category: {violation.category.value}"
        )
        user_msg.content.append(
            f"\n\nProvide the requested JSON object for the specific safety policy type."
        )
        convo.messages.append(user_msg)

        if self.deluxe:
            convo.messages.append(Message(role=Role.ASSISTANT, content=[]))
        else:
            convo.messages.append(
                Message(role=Role.ASSISTANT, content=[""], separator="")
            )

        return convo

User message has two appendings — first the analysis prompt, then a follow-up "give me the JSON". This double-prompt nudge is a common LLM-engineering trick to keep the model focused.

    SUPPORTED_POLICY_CATEGORIES = {
        SafetyPolicyCategory.ViolentMedia,
        SafetyPolicyCategory.AdultContent,
        SafetyPolicyCategory.Spam,
        SafetyPolicyCategory.IllegalAndRegulatedBehaviors,
        SafetyPolicyCategory.HateOrAbuse,
        SafetyPolicyCategory.ViolentSpeech,
        SafetyPolicyCategory.SuicideOrSelfHarm,
    }

    DELUXE_4_2_CATEGORIES = {
        SafetyPolicyCategory.AdultContent,
        SafetyPolicyCategory.ViolentMedia,
    }

SUPPORTED_POLICY_CATEGORIES mirrors _get_policy_prompt (kept in sync manually).

DELUXE_4_2_CATEGORIES is the subset that routes to the Grok 4.2 reasoning model in deluxe mode. Adult content and violent media are the highest-stakes categories where reasoning helps most (need to distinguish "violent media" from "war journalism" etc.).

    async def classify_policy_for_violation(
        self, post: Post, violation: SafetyPtosViolatedPolicy
    ) -> SafetyPolicy | None:

        if violation.category not in self.SUPPORTED_POLICY_CATEGORIES:
            return None

        convo = await self._to_convo(post, violation)

        if (
            and self.deluxe
            and violation.category in self.DELUXE_4_2_CATEGORIES
        ):
            mode = "deluxe-4.2"
            result = await self._sample_4_2(convo, post)
        else:
            mode = "deluxe" if self.deluxe else "standard"
            result = await self._sample(convo, post)

        logger.info(
            f"safety ptos policy classifier ({mode}) result for post {post.id}, violation {violation.category}: {result}"
        )

        match = self.result_pattern.search(result)
        if match:
            json_str = match.group(2).strip()
            return SafetyPolicy.model_validate_json(json_str)
        else:
            raise ValueError(
                f"Invalid output for safety ptos policy ({mode}): {result}"
            )

The main entry. Three modes:

  • Unsupported category → return None.
  • Deluxe + 4.2 category → use the EAPI reasoning model.
  • Default → use the standard VLM.

Syntax error in the open-source dump: if (\n and self.deluxe ...) — there's a leading and with no left operand. Lines like this almost always indicate a redacted condition in the public release; the original probably had something like if (use_4_2_flag and self.deluxe and ...). As-is, this file wouldn't parse. Assume the redaction tool removed the flag check.

    async def _to_convo(
        self, post: Post, violation: SafetyPtosViolatedPolicy
    ) -> Conversation:
        return self.build_convo(post, violation)

    async def _sample_4_2(self, convo: Conversation, post: Post) -> str:
        try:
            return await self.eapi_reasoning_x_algo.sample(
                convo.interleaveToEapi(), conversation_id=convo.conversation_id
            )
        except Exception:
            logger.error(
                f"Failed to call 4.2 reasoning, error: {traceback.format_exc()}"
            )
            return await self.llm.sample(
                convo.interleave(), conversation_id=convo.conversation_id
            )

_sample_4_2: try the 4.2 EAPI reasoning model with convo.interleaveToEapi() (different serialization format for EAPI vs VLM). On failure (timeout, rate limit, server error), fall back to the local VLM. Robust by default.

    async def _sample(self, convo: Conversation, post: Post = None) -> str:
        return await self.llm.sample(
            convo.interleave(), conversation_id=convo.conversation_id
        )

    async def _parse(self, post: Post, output: str) -> List[ContentCategoryResult]:
        return []

_parse returns [] — this policy classifier is invoked by the task code, not the framework's classify path. There's no ContentCategoryResult to produce; the result is a SafetyPolicy object returned by classify_policy_for_violation. The framework hooks exist for interface compatibility but are vestigial here.

How the two stages compose

In PlanSafetyPtos (Session 19):

  • task_safety_ptos_category_detection runs the category classifier, gets SafetyPostAnnotations (with a list of violated categories).
  • task_safety_ptos_policy_detection runs the policy classifier once per violated category, gathering specific policy violations.
  • task_write_safety_post_annotations_result_sink writes everything to Manhattan.

So stage 1 fans out to potentially 7 stage-2 calls. This is the cost trade-off vs running 7 separate classifiers in parallel from the start: stage 1 is cheap (small model, single prompt), stage 2 only runs for the categories that matter, and the per-category prompts can be much more focused.


Summary

Embedders: V2 is the eclectic multi-client embedder with 5 model choices, optional summaries, and document-version selection. V5 is the streamlined post-Grok-4 embedder that takes one input (text-with-pads + images + optional transcript) and produces a 1024-d L2-normalized vector via a single chat-template HTTP client. The migration from V2 to V5 is the consolidation that happens once you have a strong enough underlying multimodal LLM.

Summarizers: simple abstract base + concrete PostEmbeddingSummarizer that runs a VLM call with an external prompt file. The summary feeds the V3 embedder's _post_embedding_summarizer pipeline.

Classifiers: six concrete classifiers all following a template-method pattern (system prompt → render User+Post → assistant slot → sample → parse <json>...</json>). Variations:

  • Spam — simple binary decision with regex-fallback parsing.
  • Banger — rich 8-field result with reasoning, tags, taxonomy, slop-score, minor-detection.
  • Post safety deluxe — booleans-only, runs on popular posts.
  • Safety PTOS — two-stage (category, then per-category policy), with deluxe mode using Grok 4.2 reasoning for adult-content / violent-media.
  • Reply scorer — 4-tier parser (Pydantic → json_repair → regex → fail), with fallback from mini model to primary model on JSON failure.

Common patterns:

  • temperature = 0.000001 for near-greedy decoding — classifications must be reproducible.
  • <json>...</json> regex with reasoning prefix for structured-output parsing.
  • Empty-assistant-prefix trick (Message(role=ASSISTANT, content=[""], separator="")) to disable chain-of-thought when speed is required.
  • Model tier escalation: mini → primary, primary → primary-critical, primary-critical → EAPI-4.2. Each step costs more but provides better reliability/capability for the highest-stakes calls.
  • JSON schema constrained decoding (json_schema=... in sampler.sample) to guarantee parseable output.

Next session

Session 21 — Grox tasks part 1 (~1,500 LOC). The actual task implementations that the plans (Session 19) declare — including TaskFilter, TaskRateLimit, TaskMediaHydration, TaskPublishKafka, TaskPublishManhattan, TaskWrite*, and friends. This is where the abstract plan-DAG meets the concrete service calls.