X For You algorithm, line by line · Part 18
X For You algorithm, line by line — Part 18: Grox core (dispatcher, engine, generators)
Part 18 — Grox is the LLM-driven content-understanding pipeline that produces safety labels, content categories, and multimodal embeddings consumed by the rest of the system. Three Python processes (main / dispatcher / engine) cooperate through a multiprocessing Manager. We walk through main.py, engine.py, dispatcher.py, the schedule context, and all 16 Kafka task generators.
First Grox session. Grox is the LLM-driven content-understanding pipeline that produces the safety labels, content-category classifications, multimodal embeddings, and reply-ranking scores that the rest of the system consumes. From the home-mixer side we saw a lot of code reading grox_* fields off posts — this is where those fields are written from.
Architecturally, Grox is three cooperating Python processes:
- Main (
main.py) — owns the gRPC server (live-request path) and the multiprocessing context, handles signals and graceful shutdown. - Dispatcher (
dispatcher.py) — pulls tasks from Kafka topics via task generators, submits them to the engine, processes results back, retries on failure, acks on success. - Engine (
engine.py) — receives tasks via a multiprocessing queue, runsPlanMaster.exec(task)(we'll cover plans in Session 19), pushes results back to the dispatcher.
The three processes share state through a multiprocessing.Manager-backed dict containing two queues (task + response), and two event flags (shutdown + queue-connection-shutdown). The engine is its own process so heavy ML inference doesn't starve the dispatcher's I/O loop, and the dispatcher is its own process so Kafka backpressure doesn't block the gRPC server.
This session covers 10 files, 1,143 LOC:
| File | LOC |
|---|---|
main.py |
54 |
engine.py |
137 |
dispatcher.py |
370 |
schedules/context.py |
47 |
schedules/init.py |
35 |
schedules/types.py |
73 |
generators/task_generator.py |
153 |
generators/stream_generator.py |
222 |
lib/stream.py |
46 |
lib/utils.py |
6 |
We start with the data types, then context/init plumbing, then walk through main → engine → dispatcher → generators.
schedules/types.py (73 lines)
import time
from enum import Enum
from typing import Any
from pydantic import Field, BaseModel
from grox.config.config import TaskGeneratorType
from grox.data_loaders.data_types import (
Post,
User,
UserContext,
ContentCategoryResult,
GroxContentAnalysis,
ReplyScoreResult,
SafetyPostAnnotations,
)
from grox.classifiers.content.classifier_data_collection import (
ClassifierDataCollectionResult,
)
class TaskEligibility(str, Enum):
SPAM_COMMENT = "spam_comment"
BANGER_INITIAL_SCREEN = "banger_initial_screen"
POST_EMBEDDING_WITH_SUMMARY = "post_embedding_with_summary"
POST_EMBEDDING_WITH_SUMMARY_FOR_REPLY = "post_embedding_with_summary_for_reply"
MM_EMB_V4 = "mm_emb_v4"
MM_EMB_V5 = "mm_emb_v5"
MM_EMB_V5_FOR_REPLY = "mm_emb_v5_for_reply"
REPLY_RANKING = "reply_ranking"
SAFETY_PTOS = "safety_ptos"
POST_SAFETY = "post_safety"
TaskEligibility is the steering enum for the plan engine. A task carries a set of eligibilities; the plan engine (Session 19) routes through different stages based on which eligibilities are present. So if a task has {POST_SAFETY}, it runs only the safety-classifier branch; if it has {SPAM_COMMENT, REPLY_RANKING} it runs both branches concurrently.
The set is the per-stream type tag — different Kafka topics inject different eligibility sets (we'll see exactly which mapping in stream_generator.py).
class TaskPayload(BaseModel):
payload_id: str
post: Post | None = None
user: User | None = None
user_context: UserContext | None = None
attempt: int = 0
eligibilities: set[TaskEligibility] = Field(default_factory=set)
deadline_ts_secs: int | None = None
task_type: TaskGeneratorType | None = None
grox_content_analysis: GroxContentAnalysis | None = None
TaskPayload is what travels the dispatcher → engine queue. Pydantic gives us serialization for free (it has to cross a multiprocessing.Queue, which pickles).
payload_id— opaque string ID, used by the dispatcher to track in-flight tasks and route results back.post / user / user_context— one of these is always set; what's present depends on the task type. Most stream generators setpost; reply-ranking style tasks setuser_contexttoo.attempt— incremented on retry by the dispatcher.deadline_ts_secs— soft deadline propagated from the upstream message; tasks past deadline are dropped in the engine's plan logic.task_type— whichTaskGeneratorTypeenum produced this; used for metric attribution and for the failure-ack path in the priority generator.grox_content_analysis— for recovery tasks where we already have prior analysis state to extend.
class TaskResult(BaseModel):
task: TaskPayload
task_started_at: float
task_finished_at: float = Field(default_factory=time.perf_counter)
content_categories: list[ContentCategoryResult] = Field(default_factory=list)
multimodal_post_embedding: list[float] | None = None
reason: str = ""
success: bool = Field(default=True)
error: str | None = Field(default=None)
TaskResult travels back: engine → response queue → dispatcher. The dispatcher only really cares about success and task.payload_id (for ack/retry routing); the other fields exist for the live-RPC path (when a gRPC client is waiting on the result).
class TaskContext:
def __init__(self, task: TaskPayload):
self.payload = task
self.eligibilities: set[TaskEligibility] = task.eligibilities.copy()
self.content_categories: list[ContentCategoryResult] = []
self.summary: str = ""
self.multimodal_post_embedding: list[float] | None = None
self.multimodal_post_embedding_dict: dict[str, list[float]] = {}
self.reply_ranking_results: list[ReplyScoreResult] = []
self.reason: str = ""
self.available_topics: list | None = None
self.start_time = time.perf_counter()
self.errors: list[Exception] = []
self.safety_annotations: SafetyPostAnnotations | None = None
class TaskError(Exception):
pass
TaskContext is the mutable workspace for one task as it flows through the plan pipeline. Distinct from TaskPayload (immutable input) and TaskResult (immutable output). Plans read from payload, mutate summary / content_categories / etc., and at the end the engine turns the context into a TaskResult.
eligibilities starts as a copy because plans remove eligibilities as they're satisfied — that's how the plan master decides whether more work is needed.
TaskError is the only custom exception; plans raise it to signal a clean abort vs unexpected exceptions.
schedules/context.py (47 lines)
import signal
import logging
from typing import Any
from multiprocessing import Manager
from multiprocessing.managers import DictProxy, SyncManager
type ScheduleContext = DictProxy[str, Any]
logger = logging.getLogger(__name__)
_manager: SyncManager | None = None
The type ScheduleContext = ... is Python 3.12+ explicit type-alias syntax. The actual context is a DictProxy — a dict-like object whose contents live in a separate manager process, and reads/writes from any of our three worker processes are RPC'd to it.
def get_manager() -> SyncManager:
global _manager
if _manager is None:
_manager = Manager()
return _manager
def new_context() -> ScheduleContext:
manager = get_manager()
return manager.dict(
task_queue=manager.Queue(),
resp_queue=manager.Queue(),
live_task_queue=manager.Queue(),
live_resp_queue=manager.Queue(),
shutdown_event=manager.Event(),
queue_connection_shutdown_event=manager.Event(),
)
The context is six shared objects:
task_queue— dispatcher pushes Kafka-sourced tasks here; engine pops.resp_queue— engine pushes results; dispatcher pops.live_task_queue/live_resp_queue— same pattern but for the gRPC-live path. This separation means a backlog of Kafka tasks can't starve a live RPC reply.shutdown_event— set when the entire system should stop. All loops check this.queue_connection_shutdown_event— set first, beforeshutdown_event. The dispatcher's task-generator loop watches this one and stops accepting new Kafka tasks; the engine continues to drain whatever's already intask_queue. This is the soft-shutdown phase. Look atmain.pyand you'll see a 300-second sleep between these two signals — that's the drain window.
def prevent_default() -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
def shutdown_context(context: ScheduleContext) -> None:
context["shutdown_event"].set()
def queue_connection_shutdown_context(context: ScheduleContext) -> None:
context["queue_connection_shutdown_event"].set()
def cleanup() -> None:
if _manager is not None:
logger.warning("Shutting down context manager")
_manager.shutdown()
prevent_default() is important: it's called by every child process at startup (via init_proc). Without this, Ctrl-C would deliver SIGINT to every process in the group and they'd race to die. Instead the main process catches the signal, sets the shutdown event, and the children notice through the event flag and exit cleanly.
schedules/init.py (35 lines)
import asyncio
import gc
import logging
import random
import time
import setproctitle
from grox.config.config import grox_config
from grox.schedules.context import prevent_default
from monitor.logging import Logging
from monitor.metrics import Metrics
logger = logging.getLogger(__name__)
async def init_proc(proc_name: str):
prevent_default()
Logging.config(grox_config.logging)
Metrics.init(proc_name, grox_config.metrics)
setproctitle.setproctitle(proc_name)
logger.info(f"Changed process title to {proc_name}")
asyncio.create_task(periodic_gc(proc_name))
Every process (main, engine, dispatcher) calls this first. Four things:
- Ignore SIGINT/SIGTERM (the main process handles them; children listen to the shutdown event instead).
- Initialize logging and metrics with the process name as a tag, so OpenTelemetry / Prometheus can attribute samples.
setproctitle— change whatpsshows, so an operator seesgrox-engineinstead ofpython main.py. Tiny quality of life.- Start the periodic GC task.
async def periodic_gc(proc_name: str):
while True:
seconds = grox_config.periodic_gc.interval + random.randint(
0, int(grox_config.periodic_gc.jitter)
)
await asyncio.sleep(seconds)
logger.info(f"Running periodic GC for {proc_name}")
start = time.perf_counter()
gc.collect()
end = time.perf_counter()
logger.info(f"GC for {proc_name} took {end - start:.2f} seconds")
Force a full GC on a fixed cadence (with jitter so all three processes don't GC simultaneously). For long-running ML pipelines, the default generational GC tends to leak through across generations as large temporary tensors come and go; an explicit gc.collect() reclaims them. The duration is logged for monitoring — if GC starts taking seconds, that's a leak signal.
main.py (54 lines)
import signal
import asyncio
import logging
from grox.engine import Engine
from grox.service import GrpcServer
from grox.dispatcher import Dispatcher
from grox.config.config import grox_config
from grox.schedules.init import init_proc
from grox.schedules.context import (
cleanup,
new_context,
shutdown_context,
queue_connection_shutdown_context,
)
logger = logging.getLogger(__name__)
shutdown = asyncio.Event()
Three subsystems: Engine, GrpcServer, Dispatcher. The grpc server is in a grox.service module that's outside our scope here (it handles ad-hoc /live RPC requests, parallel to the Kafka dispatcher path).
async def serve():
await init_proc("main")
logger.info(f"Starting grox server...")
context = new_context()
engine = Engine(context)
dispatcher = Dispatcher(context)
grpc_server = GrpcServer(context)
await engine.start()
await dispatcher.start()
await grpc_server.start()
Start order matters. The engine has to be running before the dispatcher because the dispatcher will start dispatching to a queue the engine consumes from. The gRPC server starts last so we don't accept external requests until the internal pipeline is healthy.
logger.info("Grox server started")
event_loop = asyncio.get_running_loop()
event_loop.add_signal_handler(signal.SIGINT, lambda: shutdown.set())
event_loop.add_signal_handler(signal.SIGTERM, lambda: shutdown.set())
await shutdown.wait()
Signal handlers run on the event loop (not via the signal module's default behavior), so they're cooperative and can safely set an asyncio Event. We then park on shutdown.wait().
logger.warning("Grox server shutting down...")
queue_connection_shutdown_context(context)
await asyncio.sleep(300)
shutdown_context(context)
await asyncio.gather(
grpc_server.stop(),
dispatcher.stop(),
engine.stop(),
)
cleanup()
logger.warning("Grox server stopped")
if __name__ == "__main__":
asyncio.run(serve())
This is the two-phase shutdown I mentioned. Step 1: set the queue-connection-shutdown event. The dispatcher's task generators stop polling new tasks from Kafka. Sleep 300 seconds — that's the drain window: any task already in flight gets a chance to finish before we kill anything.
Step 2: set the hard shutdown event. All loops bail. Stop the three subsystems in parallel. Shut down the multiprocessing manager.
300s is hardcoded here, not config. The choice ties to deadlines: most tasks have ~minutes-long deadlines, so 5 minutes is enough to drain most of them without making operators wait too long during rollouts.
engine.py (137 lines)
The engine is the heavy worker. It runs in its own subprocess, owns the ML models (via the plans), and processes one task at a time as an asyncio task (though many can be in-flight concurrently within the process).
import os
import time
import asyncio
import logging
import traceback
import multiprocessing
from queue import Empty, Queue
from threading import Event
from multiprocessing import Process
from monitor.logging import Logging
from monitor.metrics import Metrics
from grox.config.config import grox_config
from grox.schedules.init import init_proc
from grox.schedules.types import TaskResult, TaskPayload
from grox.plans.plan_master import PlanMaster
from grox.schedules.context import ScheduleContext
from grox.data_loaders.media_processor import MediaProcessor
from grox.data_loaders.asr_processor import ASRProcessor
logger = logging.getLogger(__name__)
PlanMaster.exec is the entry point we'll cover in Session 19 — it dispatches a task to the right sequence of plan stages.
MediaProcessor and ASRProcessor are heavy thread-pool services for downloading/decoding images/video frames and running automatic speech recognition respectively. They live in the engine process and are referenced by some plans.
class Engine:
def __init__(self, context: ScheduleContext):
self.config = grox_config.engine
self.context = context
self._task_queue: Queue[TaskPayload] = self.context["task_queue"]
self._resp_queue: Queue[TaskResult] = self.context["resp_queue"]
self._shutdown_event: Event = self.context["shutdown_event"]
self._process = None
Engine grabs its three needed handles from the context. Note: live-queues are not in the engine — the gRPC server has its own consumer for live_task_queue / live_resp_queue. Different code path entirely.
def _is_shutdown(self) -> bool:
try:
return self._shutdown_event.is_set()
except BrokenPipeError:
logger.error("Broken pipe error, assuming shutdown")
return True
except Exception:
logger.error(
f"Error checking shutdown event, assuming shutdown: {traceback.format_exc()}"
)
return True
Because the event is a manager-backed proxy, is_set() is actually an RPC to the manager process. If the manager process is dying, that RPC raises BrokenPipeError. Treat that as "shutdown" — there's nothing useful to do otherwise.
async def _init_run(self):
await init_proc("engine")
MediaProcessor.start()
ASRProcessor.start()
Runs in the engine process after fork. Note MediaProcessor.start() and ASRProcessor.start() — these are class-method singletons (we'll see them in Session 19/20). They spin up worker threads for media and ASR work.
async def _process_task(self, task: TaskPayload) -> TaskResult:
logger.debug(f"engine started processing task")
start = time.perf_counter()
res = await PlanMaster.exec(task)
end = time.perf_counter()
logger.debug(f"engine finished processing task in {end - start:.2f} seconds")
Metrics.histogram("engine.task.processing_time").record(end - start)
return res
The actual work. PlanMaster.exec does everything (model inference, classification, embedding, summarization). The engine just wraps it with a histogram metric.
async def _run_task(self, task: TaskPayload):
start = time.perf_counter()
with Metrics.tracer("engine").start_as_current_span("task.root"):
Logging.set_context(task=task.payload_id)
if task.post:
Logging.set_context(post=task.post.id)
if task.user:
Logging.set_context(user=task.user.id)
if task.user_context:
Logging.set_context(user=task.user_context.user.id)
try:
res = await self._process_task(task)
self._resp_queue.put(res)
Metrics.counter("engine.task.success.count").add(1)
except Exception as e:
logger.error(f"failed to process task, error: {traceback.format_exc()}")
self._resp_queue.put(
TaskResult(
task=task,
success=False,
error=str(e),
task_finished_at=start,
task_started_at=time.perf_counter(),
)
)
Metrics.counter("engine.task.failed.count").add(1)
Wrap each task in a tracing span and set logging context (so all log lines emitted during this task carry task=, post=, user= tags — essential for grepping production logs).
Try/except is mandatory: a Python exception escaping an asyncio task wouldn't kill the engine process, but it would log nothing and leave the dispatcher waiting indefinitely. Any exception → success=False TaskResult on the response queue, so the dispatcher can retry or give up.
async def _poll_task(self) -> TaskPayload | None:
logger.debug(f"engine polling task, queue size: {self._task_queue.qsize()}")
try:
task = self._task_queue.get(block=False)
logger.debug(f"engine received task: {task.payload_id}")
Metrics.counter("engine.task.received.count").add(1)
return task
except Empty:
logger.debug("engine polling task returned None")
return None
except BrokenPipeError:
logger.error("Broken pipe error, shutting down")
return None
except Exception:
logger.error(f"failed to poll task: {traceback.format_exc()}")
return None
Non-blocking poll. Three failure modes: queue empty (normal), pipe broken (manager dying — abort), other exception (log and treat as empty).
async def _run(self, started_event: Event):
await self._init_run()
started_event.set()
while not self._is_shutdown() or not self._task_queue.empty():
task = await self._poll_task()
if task is None:
await asyncio.sleep(0.1)
continue
asyncio.create_task(self._run_task(task))
logger.warning("engine stopped")
The main loop. Two-condition exit: shutdown is set AND task queue is empty. So when the main process sends shutdown, the engine still drains whatever the dispatcher has already put on the queue. (Recall the dispatcher stopped polling Kafka 300 seconds ago at this point, so the queue is bounded.)
Important: asyncio.create_task(self._run_task(task)) — tasks run concurrently as asyncio tasks within the engine process. The polling loop never awaits the task. This is how Grox achieves throughput — many tasks in flight at once even though they're in a single process. The plans use thread pools internally for the actually-blocking ML inference.
await asyncio.sleep(0.1) when queue is empty — back off so we don't burn CPU spinning.
def run(self, started_event: Event):
asyncio.run(self._run(started_event))
os._exit(0)
This is the multiprocessing target. os._exit(0) is deliberate vs a normal sys.exit — it skips Python cleanup (atexit handlers, finalizers, threading shutdown). After running ML models for hours, those finalizers can hang on stuck threads or CUDA cleanup. os._exit is the bazooka: kill the process now.
async def start(self):
logger.info("Starting Grox engine...")
started_event = multiprocessing.Event()
self._process = Process(
target=self.run, args=(started_event,), name="grox-engine"
)
self._process.start()
started_event.wait()
logger.info("Grox engine started")
Fork the engine process. started_event is a multiprocessing Event the child sets when its async loop is ready — the parent waits on it, so by the time start() returns, the engine is genuinely ready to accept tasks. (Without this, the dispatcher might start putting tasks on the queue before the engine's pollers are awake. Probably wouldn't break anything because the queue buffers, but it's cleaner this way.)
async def stop(self):
logger.warning("Stopping Grox engine...")
if self._process and self._process.is_alive():
self._process.join(self.config.graceful_shutdown_timeout)
else:
logger.warning("Engine process is not alive, skipping join")
await MediaProcessor.stop()
await ASRProcessor.stop()
logger.warning("Engine stopped")
Wait for the child to exit; cap with a configured timeout (after which the parent would just continue and the child would be reaped later as an orphan). Then stop Media/ASR processors — note these are static methods called from the main process, suggesting they're tooled with cross-process state. We'll see them in detail in Session 19/20.
dispatcher.py (370 lines)
The dispatcher is the orchestration brain. It owns the task generators, submits tasks to the engine queue, processes results, retries failed tasks, and acks the upstream Kafka queue when tasks succeed (or terminally fail).
Imports and class setup
import asyncio
import logging
import traceback
import multiprocessing
from queue import Empty, Queue
from threading import Event
from multiprocessing import Process
from tenacity import retry, wait_incrementing, stop_after_attempt
from monitor.metrics import Metrics
from grox.config.config import TaskGeneratorType, grox_config
from grox.schedules.init import init_proc
from grox.schedules.types import TaskResult, TaskPayload
from grox.schedules.context import ScheduleContext
from grox.generators.task_generator import TaskGenerator, PriorityTaskGenerator
from grox.generators.stream_generator import (
PostStreamTaskGenerator,
PostStreamRecoveryTaskGenerator,
PostStreamTestTaskGenerator,
PostStreamDelayedTaskGenerator,
PostSafetyStreamTaskGenerator,
ReplyRankingRecoveryTaskGenerator,
PostEmbeddingRequestWithSummaryStreamTaskGenerator,
PostEmbeddingRequestWithSummaryRecoveryStreamTaskGenerator,
MinTractionPostStreamForGroxTaskGenerator,
MinTractionPostStreamForGroxPtosTaskGenerator,
PostEmbeddingV5StreamTaskGenerator,
PostEmbeddingV5ForReplyStreamTaskGenerator,
MinTractionPostStreamForGroxMultiModalTaskGenerator,
PostEmbeddingRequestWithSummaryForReplyRecoveryStreamTaskGenerator,
SafetyPtosRecoveryStreamTaskGenerator,
SafetyPtosDeluxeStreamTaskGenerator,
)
That's 16 stream task generators imported. Each one corresponds to a Kafka topic with a specific eligibility set.
class Dispatcher:
def __init__(self, context: ScheduleContext):
self.config = grox_config.dispatcher
self.context = context
self._task_queue: Queue[TaskPayload] = self.context["task_queue"]
self._resp_queue: Queue[TaskResult] = self.context["resp_queue"]
self._shutdown_event: Event = self.context["shutdown_event"]
self._queue_connection_shutdown_event: Event = self.context[
"queue_connection_shutdown_event"
]
self._process = None
Grabs four context handles. Note the queue-connection-shutdown event — that's the soft signal that says "stop accepting new Kafka tasks but keep processing results."
Shutdown checks
def _is_shutdown(self) -> bool:
try:
return self._shutdown_event.is_set()
except BrokenPipeError:
logger.error("Broken pipe error, assuming shutdown")
return True
except Exception:
logger.error(
f"Error checking shutdown event, assuming shutdown: {traceback.format_exc()}"
)
return True
def _is_queue_connection_shutdown(self) -> bool:
try:
return self._queue_connection_shutdown_event.is_set()
except BrokenPipeError:
logger.error("Broken pipe error, assuming queue connection shutdown")
return True
except Exception:
logger.error(
f"Error checking shutdown event, assuming queue connection shutdown: {traceback.format_exc()}"
)
return True
Same pattern as engine. Two flags = two checkers.
Init with retry
@retry(
stop=stop_after_attempt(3), wait=wait_incrementing(start=1, increment=3, max=9)
)
async def _init_run(self):
await init_proc("dispatcher")
self._in_flights: set[str] = set()
self._task_generator = self._get_task_generators()
await self._task_generator.start()
The tenacity.retry decorator: 3 attempts, wait 1s, then 4s, then 7s (capped at 9). Init is where Kafka connections get established, and Kafka cluster startup can be flaky. If init fails three times, the process dies and an external orchestrator restarts it — much safer than a process that thinks it started but actually didn't.
self._in_flights: set[str] — payload IDs currently submitted to the engine but not yet acked. Used for backpressure (the dispatcher won't submit more if this set hits max) and for routing results back.
The task-generator factory
def _get_task_generators(self) -> TaskGenerator:
generators: list[tuple[TaskGenerator, int]] = []
for task_generator_config in self.config.task_generators:
match task_generator_config.type:
case TaskGeneratorType.POST_STREAM:
generators.append(
(
PostStreamTaskGenerator(task_generator_config.max_qps),
task_generator_config.weight,
)
)
case TaskGeneratorType.POST_STREAM_RECOVERY:
generators.append(...)
# ... 14 more match arms, one per generator type ...
case _:
raise ValueError(
f"Invalid task generator type: {task_generator_config.type}"
)
return PriorityTaskGenerator(generators)
Configuration-driven: the config has a list of task-generator descriptors (type, max_qps, weight); for each, we instantiate the right class and pair it with its weight. Wrap them all in PriorityTaskGenerator so that downstream code sees a single uniform stream. PriorityTaskGenerator.poll() will weight-sample between the underlying streams (we'll see how in task_generator.py).
The 16 match arms each construct a different generator subclass with (max_qps, weight) — all the variation is just the class name.
Submit a task to the engine
async def _submit_task(self, task_payload: TaskPayload) -> None:
inflight_gauge = Metrics.gauge("dispatcher.inflight.count")
self._in_flights.add(task_payload.payload_id)
inflight_gauge.set(len(self._in_flights))
Metrics.counter("dispatcher.task.sent.count").add(
1,
attributes={
"task_type": task_payload.task_type.value
if task_payload.task_type
else "none"
},
)
self._task_queue.put(task_payload)
logger.debug(
f"Submitted task: {task_payload.payload_id}, queue size: {self._task_queue.qsize()}"
)
Add to in-flights, update gauge, increment counter with task_type attribute (so the metrics back-end can break down throughput per generator type), then put on the task queue.
The fill loop
async def _fill_loop(self):
logger.info("Starting fill loop")
while not self._is_shutdown():
try:
async for task_payload in self._task_generator.poll():
if task_payload is None:
await asyncio.sleep(0.01)
continue
while len(self._in_flights) >= self.config.max_in_flight:
await asyncio.sleep(0.01)
continue
await self._submit_task(task_payload)
except Exception:
logger.error(
f"Error polling from task queues: {traceback.format_exc()}"
)
The producer side: pull from priority generator and feed the engine queue.
task_payload is None— generator says no tasks available right now; sleep briefly and retry.- The inner
while len(self._in_flights) >= self.config.max_in_flight: ...is the backpressure: when too many tasks are submitted to the engine but not yet returned, we stop pulling more from Kafka. This prevents the engine from being overwhelmed (and prevents the dispatcher from holding onto too many Kafka messages without acking — Kafka has its own client-side commit/ack semantics that the loader handles). - The outer try/except wraps everything: a malformed Kafka message that crashes a generator shouldn't bring down the dispatcher; log and re-enter the outer while.
Result polling
async def _poll_result(self) -> TaskResult | None:
try:
res = self._resp_queue.get(block=False)
logger.debug(f"Dispatcher received result: {res.task.payload_id}")
Metrics.counter("dispatcher.result.received.count").add(1)
return res
except Empty:
return None
except BrokenPipeError:
logger.error("Broken pipe error, shutting down")
return None
except Exception:
logger.error(f"failed to poll result: {traceback.format_exc()}")
return None
Mirror of _poll_task. Non-blocking.
The result loop
async def _result_loop(self) -> None:
logger.info("Starting result loop")
max_attempts = self.config.max_attempts
inflight_gauge = Metrics.gauge("dispatcher.inflight.count")
while not self._is_shutdown() or self._in_flights:
try:
result = await self._poll_result()
if result is None:
await asyncio.sleep(0.01)
continue
task = result.task
if result.success:
Metrics.counter("dispatcher.result.success.count").add(1)
if task.payload_id in self._in_flights:
self._in_flights.remove(task.payload_id)
inflight_gauge.set(len(self._in_flights))
await self._task_generator.ack(result)
Successful result → remove from in-flights, ack the source generator (which acks the underlying Kafka consumer). The conditional if task.payload_id in self._in_flights covers the edge case where a result came in for a task we already removed (shouldn't happen normally, but defensive).
else:
if task.attempt < max_attempts:
Metrics.counter("dispatcher.result.failed.count").add(1)
logger.warning(
f"Task {task.payload_id} failed, retrying... (attempt {task.attempt})"
)
task.attempt += 1
await self._submit_task(task)
else:
if task.payload_id in self._in_flights:
self._in_flights.remove(task.payload_id)
inflight_gauge.set(len(self._in_flights))
logger.error(
f"Task {task.payload_id} failed after {max_attempts} attempts, error is {result.error}"
)
origin = self._task_generator.identify_task_origin(result)
Metrics.counter("dispatcher.result.failed.final.count").add(
1,
attributes={
"origin": origin.value if origin else "unknown"
},
)
if origin is None:
logger.warning(
f"No origin found for task {task.payload_id}, skipping ack"
)
else:
await self._task_generator.ack(result)
Failed result. Two cases:
- Retryable:
task.attempt < max_attempts. Bump attempt and re-submit to the engine queue. The task stays in_in_flightsduring retry (because we haven't removed it). Note Kafka has not been acked yet either — the underlying Kafka consumer still holds this offset uncommitted. - Terminal failure: out of attempts. Remove from in-flights, identify the originating generator (so we can attribute the failure for metrics — recall
PriorityTaskGeneratorlooks up which underlying generator produced the task via its_result_cache), then ack so Kafka commits the offset. This is critical: a poisonous message would otherwise loop forever.
The _is_shutdown() or self._in_flights exit condition: even after shutdown is signaled, keep processing results until the in-flight set drains. Mirror of the engine's drain logic.
except Empty:
await asyncio.sleep(0.1)
except BrokenPipeError:
logger.error("Broken pipe error, shutting down")
break
logger.warning("Result loop finished")
Exception arms cover pipe-broken (queue manager died — bail) and Empty (shouldn't happen since _poll_result catches it, but defensive).
Wait for soft shutdown
async def _wait_for_queue_connection_shutdown(self):
while not self._is_queue_connection_shutdown():
await asyncio.sleep(1)
logger.warning("Shutdowning task generators")
await self._task_generator.stop()
logger.warning("Task generators stopped")
This runs concurrently with _fill_loop and _result_loop. When the queue-connection-shutdown event is set (by main, before the 300s drain sleep), it stops the task generators. That stops new tasks from being pulled from Kafka. The fill loop then exits its async for (when the generator exhausts), then exits the while because shutdown will follow soon after.
Main run
async def _run(self, started_event: Event):
await self._init_run()
started_event.set()
await asyncio.gather(
self._fill_loop(),
self._result_loop(),
self._wait_for_queue_connection_shutdown(),
)
def run(self, started_event: Event):
asyncio.run(self._run(started_event))
Three concurrent coroutines. asyncio.gather waits for all of them; they coordinate through shared state (_in_flights, the queues, the events).
Notice this run doesn't call os._exit(0) (unlike the engine). The dispatcher doesn't own ML model state, so normal Python cleanup is safe.
Start/stop boilerplate
async def start(self):
logger.info("Starting Grox dispatcher...")
started_event = multiprocessing.Event()
self._process = Process(
target=self.run, args=(started_event,), name="grox-dispatcher"
)
self._process.start()
started_event.wait()
logger.info("Grox dispatcher started")
async def stop(self):
logger.warning("Stopping Grox dispatcher...")
if self._process and self._process.is_alive():
self._process.join(self.config.graceful_shutdown_timeout)
else:
logger.warning("Dispatcher process is not alive, skipping join")
logger.warning("Dispatcher stopped")
Same pattern as engine. Fork, wait for the child to signal ready, return.
generators/task_generator.py (153 lines)
The base class for everything pluggable into the dispatcher, plus the PriorityTaskGenerator that fans out across multiple sources.
import asyncio
import logging
import random
import traceback
from abc import ABC, abstractmethod
from grox.config.config import TaskGeneratorType
from grox.schedules.types import TaskResult, TaskPayload
from limits import RateLimitItemPerSecond, storage, strategies
from typing import AsyncGenerator
logger = logging.getLogger(__name__)
limiter = strategies.FixedWindowRateLimiter(storage.MemoryStorage())
limits is a third-party rate-limit library. We use a fixed-window strategy with an in-memory backend — both because we're inside a single process and because we don't need cross-process coordination of the limit (each generator instance has its own limiter key).
Base class
class TaskGenerator(ABC):
TASK_GENERATOR_TYPE: TaskGeneratorType | None = None
def __init__(self, max_qps: int | None):
self._shutdown_event = asyncio.Event()
self._limiter_key = self.__class__.__name__
self._limit = RateLimitItemPerSecond(max_qps, 1) if max_qps else None
TASK_GENERATOR_TYPE is a class attribute each subclass sets — so the dispatcher's task_type metric attribute is automatic.
max_qps: if specified, create a "max_qps requests per 1 second" limit using the class name as the limiter key. If None (used by PriorityTaskGenerator, which is unlimited because its children have their own limits), no limit.
def is_shutdown(self) -> bool:
try:
return self._shutdown_event.is_set()
except Exception:
logger.error(
f"Error checking if task generator is shutdown: {traceback.format_exc()}"
)
return True
async def start(self) -> None:
pass
async def stop(self) -> None:
logger.info(f"Stopping task generator {self.__class__.__name__}")
self._shutdown_event.set()
Default no-op start, stop sets the local shutdown event. The asyncio.Event is per-instance (not shared via manager) since each generator runs in the dispatcher process.
async def poll(self) -> AsyncGenerator[TaskPayload | None, None]:
async for payload in self._poll():
if not payload:
yield None
continue
if self._limit:
while not limiter.test(self._limit, self._limiter_key):
yield None
await asyncio.sleep(0.01)
limiter.hit(self._limit, self._limiter_key)
yield payload
@abstractmethod
def _poll(self) -> AsyncGenerator[TaskPayload | None, None]:
pass
The public poll does rate limiting around the subclass-provided _poll. When the subclass yields a payload and we have a limit, we busy-loop (yielding None back to the caller so it can do something else, and sleeping briefly) until the limiter's window has capacity, then hit the limiter and yield the payload.
Yielding None while waiting is important: the caller's async for loop sees a None and can interleave other work (like the dispatcher's backpressure check or even shutdown). If we just await asyncio.sleep and didn't yield, the dispatcher's fill-loop would block here without checking shutdown.
async def ack(self, result: TaskResult):
pass
def identify_task_origin(self, result: TaskResult) -> TaskGeneratorType | None:
return self.TASK_GENERATOR_TYPE
Default ack is a no-op (only stream generators need to ack the underlying Kafka). Default origin identification returns the class attribute.
PriorityTaskGenerator — weighted random multiplexing
class PriorityTaskGenerator(TaskGenerator):
def __init__(self, generators: list[tuple[TaskGenerator, int]]):
if not generators:
raise ValueError("No generators provided")
if any(weight <= 0 for _, weight in generators):
raise ValueError("All weights must be positive")
super().__init__(None)
self._generators: dict[str, TaskGenerator] = {}
self._weights: dict[str, int] = {}
for i, (gen, weight) in enumerate(generators):
label = f"GEN_{i}"
self._generators[label] = gen
self._weights[label] = weight
self._result_cache: dict[str, str] = {}
logger.info(
f"Initialized priority task generator with {list(zip(self._generators.keys(), [gen.__class__.__name__ for gen in self._generators.values()], self._weights.values(), strict=True))}"
)
Each child is given a string label GEN_0, GEN_1, etc., separate from its class name (so two instances of the same class would be different generators). _result_cache maps task payload IDs to the label that produced them — so when a result comes back, we know which generator to route the ack to.
The init log line is verbose but useful: it tells the operator which generators are wired up with which weights at startup.
async def start(self) -> None:
logger.info(f"Starting priority task generators")
await asyncio.gather(*[gen.start() for gen in self._generators.values()])
self._streams = {label: gen.poll() for label, gen in self._generators.items()}
logger.info(f"Priority task generators started")
Start all children in parallel, then create their poll async-generators. self._streams is a dict from label to the actual async generator iterator — we hold these references so we can anext() them on demand.
async def stop(self) -> None:
logger.warning(f"Stopping priority task generators")
await asyncio.gather(*[gen.stop() for gen in self._generators.values()])
await super().stop()
logger.warning(f"Priority task generators stopped")
Stop all children, then call super().stop() to set the local shutdown event.
async def _poll(self) -> AsyncGenerator[TaskPayload | None, None]:
if not self._streams:
raise RuntimeError("Task generators not started")
while self._weights:
_weights = self._weights.copy()
polled = False
while _weights:
labels = list(_weights.keys())
weights = list(_weights.values())
labels = random.choices(labels, weights, k=1)
label = labels[0]
stream = self._streams[label]
try:
payload = await anext(stream)
if payload:
self._result_cache[payload.payload_id] = label
yield payload
polled = True
break
else:
del _weights[label]
except StopAsyncIteration:
logger.warning(
f"Task generator {label} exhausted, removing from pool"
)
if label in _weights:
del _weights[label]
if label in self._weights:
del self._weights[label]
continue
except Exception:
logger.error(
f"Error polling task generator {label}: {traceback.format_exc()}"
)
if label in _weights:
del _weights[label]
continue
if not polled:
yield None
This is the weighted-random multiplex with skip-the-empties algorithm:
- Outer loop: while we have any generators left at all (
self._weights). - Take a local copy
_weightsfor this iteration's selection. - Inner loop: weighted-random sample a label, try
anexton its stream.- Got a payload: cache the label, yield, break out of inner loop, restart outer iteration (so weights are recomputed fresh — this matters: if a generator returns None once, we don't re-try it this iteration, but next iteration it gets a fresh chance at the full weight).
- Got None: remove that label from the local _weights — i.e., don't pick it again this iteration, but it could be picked in a future iteration (when fresh weights are sampled).
- StopAsyncIteration: the generator is permanently done. Remove from both local and global weights.
- Other exception: log, remove from local (will be retried next iteration). This is intentionally lenient — a generator hitting a transient error shouldn't permanently disable it.
- If the inner loop exits without polling anything (all generators returned None this round), yield None so the dispatcher's
async forgets a chance to react.
The reason for the _weights.copy() per outer iteration: so a generator that says "no" once gets ignored for the rest of this round, but gets a fresh shot next round. Without this, you'd either re-poll the same generator forever (if you didn't remove it) or permanently shut it down (if you removed it). Compromise: skip it for this round only.
async def ack(self, result: TaskResult):
logger.debug(f"Acknowledging task {result.task.payload_id}")
label = self._result_cache.pop(result.task.payload_id, None)
if not label:
logger.warning(
f"No label found for task {result.task.payload_id}, skipping ack"
)
return
gen = self._generators[label]
await gen.ack(result)
def identify_task_origin(self, result: TaskResult) -> TaskGeneratorType | None:
logger.debug(f"Identifying task origin for {result.task.payload_id}")
label = self._result_cache.get(result.task.payload_id)
if not label:
logger.warning(
f"No label found for task {result.task.payload_id}, cannot identify origin"
)
return None
gen = self._generators[label]
return gen.identify_task_origin(result)
ack pops the label from the cache (using pop not get because we won't need it after ack) and forwards the ack to the right generator. identify_task_origin does the same lookup but with get (because identify is called before terminal failures and we still want to be able to ack afterward).
Note: if the cache miss, both functions log a warning and return None. This shouldn't happen in normal operation but could during edge cases at shutdown.
generators/stream_generator.py (222 lines)
The Kafka-backed concrete generators. They all inherit from a single abstract StreamTaskGenerator that wraps a MessageQueueLoader (which we'll see in Session 19).
import abc
import logging
from collections.abc import AsyncGenerator
from grox.config.config import KafkaTopicName, TaskGeneratorType
from grox.schedules.types import TaskResult, TaskPayload, TaskEligibility
from grox.data_loaders.kafka_loader import (
KafkaAdPostLoader,
KafkaPostLoader,
KafkaPostEmbeddingRequestLoader,
)
from grox.generators.task_generator import TaskGenerator
from grox.data_loaders.message_queue_loader import MessageQueueLoader
logger = logging.getLogger(__name__)
Abstract base
class StreamTaskGenerator(TaskGenerator, metaclass=abc.ABCMeta):
ELIGIBILITIES_TO_INJECT: set[TaskEligibility]
def __init__(self, max_qps: int | None):
super().__init__(max_qps)
self._loader = self._get_loader()
@abc.abstractmethod
def _get_loader(self) -> MessageQueueLoader:
pass
ELIGIBILITIES_TO_INJECT is the second per-subclass class attribute (alongside TASK_GENERATOR_TYPE). It declares "tasks coming from this stream are eligible for these plan stages." The subclass provides the loader.
async def start(self) -> None:
logger.info("Starting StreamTaskGenerator")
await self._loader.start()
async def stop(self) -> None:
logger.info("Stopping StreamTaskGenerator")
await super().stop()
await self._loader.stop()
Delegate start/stop to the loader.
async def _poll(self) -> AsyncGenerator[TaskPayload | None, None]:
async for payload in self._loader.poll():
if not payload:
yield None
continue
yield TaskPayload(
payload_id=payload.mid,
post=payload.post,
user=payload.user,
user_context=payload.user_context,
deadline_ts_secs=payload.deadline_ts_secs,
task_type=self.TASK_GENERATOR_TYPE,
eligibilities=self.ELIGIBILITIES_TO_INJECT.copy(),
grox_content_analysis=payload.grox_content_analysis,
)
async def ack(self, result: TaskResult):
await self._loader.ack(result.task.payload_id, result.success)
The crucial transform: a Kafka message has mid (message ID — used as Kafka offset key for ack), post, user, etc. We wrap that into a TaskPayload, copying the class's eligibility set so plan-stage mutations don't leak across tasks.
ack forwards to the loader with the success flag — the loader uses this to decide whether to commit the offset (success) or NACK / retry (failure).
The 16 concrete generator subclasses
These are mostly identical: pick a topic, pick an eligibility set. I'll skip repeating every one — here's a representative selection and a summary table.
class PostStreamTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = TaskGeneratorType.POST_STREAM
ELIGIBILITIES_TO_INJECT = {
TaskEligibility.SPAM_COMMENT,
TaskEligibility.REPLY_RANKING,
}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.CONTENT_UNDERSTANDING_REALTIME_UNIFIED_POSTS
)
The "primary" post stream. Tasks here run spam-comment detection AND reply-ranking — two plan branches concurrently. The KafkaPostLoader reads from the unified posts topic, which is the firehose of all new posts in the system.
class MinTractionPostStreamForGroxTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = TaskGeneratorType.POST_MIN_TRACTION_STREAM_FOR_GROX
ELIGIBILITIES_TO_INJECT = {TaskEligibility.BANGER_INITIAL_SCREEN}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.CONTENT_UNDERSTANDING_REALTIME_UNIFIED_POSTS_MIN_TRACTION_FOR_GROX
)
class MinTractionPostStreamForGroxPtosTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = TaskGeneratorType.POST_MIN_TRACTION_STREAM_FOR_GROX_PTOS
ELIGIBILITIES_TO_INJECT = {TaskEligibility.SAFETY_PTOS}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.CONTENT_UNDERSTANDING_REALTIME_UNIFIED_POSTS_MIN_TRACTION_FOR_GROX_PTOS
)
class MinTractionPostStreamForGroxMultiModalTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = (
TaskGeneratorType.POST_MIN_TRACTION_STREAM_FOR_GROX_MULTI_MODAL
)
ELIGIBILITIES_TO_INJECT = {TaskEligibility.POST_EMBEDDING_WITH_SUMMARY_FOR_REPLY}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.CONTENT_UNDERSTANDING_REALTIME_UNIFIED_POSTS_MIN_TRACTION_FOR_GROX_MULTI_MODAL
)
"Min traction" means posts that have at least some engagement (likes, retweets, replies above a threshold). The system doesn't run the full safety / banger / multimodal classifier set on every post — many low-quality posts aren't worth the GPU. Instead they're filtered upstream and only the ones above the minimum-traction bar hit these streams.
Three different "min traction" streams send the same post (or close to it) through three different plan branches:
- Banger (
BANGER_INITIAL_SCREEN) — predict whether this post is a candidate for the "banger" initial-screen surface. - PTOS (
SAFETY_PTOS) — Pillars-of-Safety classification (we sawpillars_of_safetyreferenced in home-mixer filters back in Session 5). - Multi-modal (
POST_EMBEDDING_WITH_SUMMARY_FOR_REPLY) — compute the multimodal embedding that feeds reply-ranking models.
Splitting the work across three topics rather than running all three in one task means each can have its own QPS limit, error rate, and on-call rotation.
class PostEmbeddingRequestWithSummaryStreamTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = TaskGeneratorType.POST_EMBEDDING_REQUEST_STREAM_WITH_SUMMARY
ELIGIBILITIES_TO_INJECT = {TaskEligibility.POST_EMBEDDING_WITH_SUMMARY}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY
)
class PostEmbeddingRequestWithSummaryRecoveryStreamTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = (
TaskGeneratorType.POST_EMBEDDING_REQUEST_STREAM_WITH_SUMMARY_RECOVERY
)
ELIGIBILITIES_TO_INJECT = {TaskEligibility.POST_EMBEDDING_WITH_SUMMARY}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY_RECOVERY
)
Note the same ELIGIBILITIES_TO_INJECT = {POST_EMBEDDING_WITH_SUMMARY} but different TASK_GENERATOR_TYPE and different topic. The "recovery" topic is for re-processing posts whose embeddings failed in the primary pipeline. Same task content, different ack semantics (the loader logic probably treats recovery topics differently w.r.t. dead-letter routing).
class PostEmbeddingV5StreamTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = TaskGeneratorType.POST_EMBEDDING_V5_STREAM
ELIGIBILITIES_TO_INJECT = {TaskEligibility.MM_EMB_V5}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY
)
class PostEmbeddingV5ForReplyStreamTaskGenerator(StreamTaskGenerator):
TASK_GENERATOR_TYPE = TaskGeneratorType.POST_EMBEDDING_V5_FOR_REPLY_STREAM
ELIGIBILITIES_TO_INJECT = {TaskEligibility.MM_EMB_V5_FOR_REPLY}
def _get_loader(self):
return KafkaPostLoader(
KafkaTopicName.CONTENT_UNDERSTANDING_REALTIME_UNIFIED_POSTS_MIN_TRACTION_FOR_GROX_MULTI_MODAL
)
V5 is the newer multimodal embedding model. Note PostEmbeddingV5StreamTaskGenerator reads from the same topic as PostEmbeddingRequestWithSummaryStreamTaskGenerator — but with a different eligibility (MM_EMB_V5 vs POST_EMBEDDING_WITH_SUMMARY). This is how the system runs v4 and v5 in parallel during the model rollout: both consumers see every message, each runs their version of the embedding. (Until v5 is fully validated, v4 is the production embedding; once v5 wins the A/B test, v4 gets retired.)
Here's the full registry. The summary:
| Generator class | Topic | Eligibilities |
|---|---|---|
PostStreamTaskGenerator |
UNIFIED_POSTS | SPAM_COMMENT, REPLY_RANKING |
MinTractionPostStreamForGrox |
UNIFIED_POSTS_MIN_TRACTION_FOR_GROX | BANGER_INITIAL_SCREEN |
MinTractionPostStreamForGroxPtos |
UNIFIED_POSTS_MIN_TRACTION_FOR_GROX_PTOS | SAFETY_PTOS |
MinTractionPostStreamForGroxMultiModal |
UNIFIED_POSTS_MIN_TRACTION_FOR_GROX_MULTI_MODAL | POST_EMBEDDING_WITH_SUMMARY_FOR_REPLY |
PostEmbeddingRequestWithSummaryForReplyRecovery |
GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY_FOR_REPLY_RECOVERY | POST_EMBEDDING_WITH_SUMMARY_FOR_REPLY |
PostStreamRecovery |
UNIFIED_POSTS_RECOVERY | BANGER_INITIAL_SCREEN |
SafetyPtosRecovery |
SAFETY_PTOS_RECOVERY | SAFETY_PTOS |
SafetyPtosDeluxe |
SAFETY_PTOS_DELUXE | SAFETY_PTOS |
PostStreamTest |
UNIFIED_POSTS_TEST | BANGER_INITIAL_SCREEN |
PostSafetyStream |
UNIFIED_POSTS_POPULAR | POST_SAFETY |
PostStreamDelayed |
UNIFIED_POSTS_DELAYED | {} (empty!) |
ReplyRankingRecovery |
REPLY_RANKING_RECOVERY | REPLY_RANKING |
PostEmbeddingRequestWithSummary |
GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY | POST_EMBEDDING_WITH_SUMMARY |
PostEmbeddingRequestWithSummaryRecovery |
GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY_RECOVERY | POST_EMBEDDING_WITH_SUMMARY |
PostEmbeddingV5Stream |
GROX_MULTIMODAL_EMBEDDING_REQUESTS_WITH_SUMMARY | MM_EMB_V5 |
PostEmbeddingV5ForReplyStream |
UNIFIED_POSTS_MIN_TRACTION_FOR_GROX_MULTI_MODAL | MM_EMB_V5_FOR_REPLY |
A few oddities:
PostStreamDelayedhas an empty eligibility set. That means tasks from this stream don't trigger any plan stage by default. Something in the plan engine (probably "auto-promote eligibilities based on payload fields") must inject them at run time, or this is genuinely a no-op stream used for offset advancement.PostSafetyStreamreads fromUNIFIED_POSTS_POPULAR— popular posts get an additional "post safety" pass not done on the main stream. That makes sense: rendering bad content to many users is worse than rendering bad content to few.SafetyPtosDeluxeandSafetyPtosRecoveryboth haveSAFETY_PTOSeligibility — likely they use different upstream filters but the same plan stage. "Deluxe" might be a higher-tier (more expensive) variant; without the topic-config code we can't say more.
Key takeaway from this file
The dispatcher's task generators are a stream-multiplexer. There are ~16 Kafka topics in production at varying QPS, and the dispatcher pulls weighted-randomly from all of them simultaneously, fanning all tasks into the single engine queue. The eligibility tags carried by each task tell the engine which plan branches to run.
lib/stream.py (46 lines)
A small but useful async-stream utility.
from typing import AsyncIterator, AsyncGenerator, TypeVar
from asyncio import Queue, create_task
from enum import Enum
import logging
T = TypeVar("T")
class StreamStatus(Enum):
STOP = "Stop"
logger = logging.getLogger(__name__)
async def parallel_merge(*streams: AsyncIterator[T]) -> AsyncGenerator[T, None]:
if not streams:
return
queue: Queue[T | StreamStatus | Exception] = Queue()
async def enqueue(ait: AsyncIterator[T]):
try:
async for item in ait:
await queue.put(item)
except GeneratorExit:
pass
except Exception as e:
await queue.put(e)
finally:
await queue.put(StreamStatus.STOP)
_enq_tasks = [create_task(enqueue(s)) for s in streams]
nstreams_done = 0
while True:
item = await queue.get()
if item == StreamStatus.STOP:
nstreams_done += 1
elif isinstance(item, Exception):
raise item
else:
yield item
queue.task_done()
if nstreams_done == len(streams):
break
This is parallel_merge — merge N async iterators into a single async generator that yields items as they arrive from any source. Like asyncio.as_completed but for streams instead of futures.
How it works:
- One shared asyncio queue.
- Spawn one consumer task per input stream that pushes items onto the queue, plus a sentinel
StreamStatus.STOPwhen done, plus the exception (if one occurred) before STOP. - The main loop pulls from the queue, tracks how many STOPs we've seen, and re-raises any exception immediately.
The GeneratorExit arm — async for cleanup paths can raise this when the caller stops consuming. Swallow it; just stop pushing.
_enq_tasks is held in a local variable — without this, the tasks would be garbage-collected by asyncio if there's no strong reference, which would cancel them. Stashing them keeps them alive.
This utility isn't used in the dispatcher code we walked through; it shows up in some of the data loaders (Session 19) for merging messages from multiple Kafka partitions.
lib/utils.py (6 lines)
def camel_to_snake(s: str) -> str:
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_")
def snake_to_camel(s: str) -> str:
return "".join(word.capitalize() for word in s.split("_"))
Two case-converters. camel_to_snake walks character-by-character (inserting _ before each uppercase char, then .lstrip("_") to remove a leading underscore if the string started with uppercase). snake_to_camel splits on _ and capitalizes each piece.
Note snake_to_camel("foo_bar") returns "FooBar" — that's actually PascalCase (also called UpperCamelCase). True camelCase would be fooBar. The name is slightly misleading but the behavior is consistent within the codebase.
Used in plan/task naming serialization — class names get converted to snake-case for config keys, config keys converted back to class lookups.
Summary
Grox's core is three Python processes (main, dispatcher, engine) coordinating through a multiprocessing.Manager-backed shared dictionary of queues and events. A two-phase shutdown gives in-flight tasks 300 seconds to drain before a hard stop.
The dispatcher is a stream multiplexer: 16 Kafka-backed task generators, wrapped in a single PriorityTaskGenerator that does weighted-random sampling, feed a single task queue. Each task carries an eligibility set declaring which plan stages should run.
The engine is a task executor: pull from the queue, run PlanMaster.exec(task) (which we'll cover in Session 19), push the result back. Tasks run as concurrent asyncio tasks within a single process, so I/O-bound work doesn't block compute-bound work and vice versa.
The retry path is owned by the dispatcher: a failed task is re-submitted up to max_attempts times before it gets ack'd to Kafka. This means Kafka offsets don't advance for transient failures — a process restart would re-pickup the same tasks.
Where this fits in the larger system
Looking back at home-mixer:
- The
grox_*fields on Post objects (e.g.,grox_content_categories,grox_safety_pillars) — Grox's engine produces these. - The
ads_brand_safetyfilter we walked through in Session 5 — its safety labels come from Grox PTOS classifier outputs. - The multimodal embeddings used in Phoenix retrieval (Session 16) — Grox's MM_EMB_V5 generator produces these and writes them back to the post store.
Grox is essentially the enrichment pipeline for the recommendation system. The home-mixer reads finished annotations; Grox computes them.
Next session
Session 19 — Grox plans + data loaders (~1,300 LOC).
grox/plans/*— thePlanMasterand its plan-stage classes. This is wheretask.eligibilitiesbecomes actual ML inference calls.grox/data_loaders/*— Kafka loaders, message-queue abstraction, post/user data types, media/ASR processors.
The plans are where the system's safety logic lives, so this should be one of the more interesting sessions.