X For You algorithm, line by line · Part 21
X For You algorithm, line by line — Part 21: Grox tasks part 1 (base + filters + classifier wrappers)
Part 21 — the task layer where plan declarations meet concrete service calls. Base Task classes with retry/skip semantics, env-based disable rules, eligibility filters (spam-vs-reply-ranking follower-bucket split), TTL-cache rate limiters, media+ASR hydration, post reload with not-found retry, and six classifier-wrapper tasks plus the moderation-action trigger.
The penultimate session. Tasks are the imperative units of work that plans (Session 19) compose into DAGs. Each plan declares a TASKS: dict[str, type[Task]] mapping logical name → Task class; the plan executor calls Task.exec(ctx) at the right time, with ctx being the shared TaskContext.
In this session we cover the foundation and the classifier-wrapper layer (filters, rate limits, media hydration, post loading, ASR, and the six classifier-driven tasks). Session 22 will cover the publishing layer (Kafka pubs, Manhattan sinks, multimodal embedding write).
Files in this session, in walk-order:
| File | LOC | Role |
|---|---|---|
task.py |
150 | Base Task class hierarchy |
disable_rules.py |
56 | Env-based task disabling |
task_filters.py |
370 | Per-plan eligibility filters |
task_rate_limit.py |
191 | TTL-cache per-post deduplication |
task_media.py |
21 | Media hydration via MediaProcessor |
task_asr.py |
74 | Audio transcription via ASRProcessor |
task_load_post_with_not_found_retry.py |
35 | Re-fetch the post (with retry) |
task_load_post_with_summary.py |
27 | Re-fetch post + pre-computed summary |
task_load_user_recent_posts.py |
23 | Hydrate user's posting history |
task_summarizer_for_post_embedding.py |
25 | Run the post summarizer |
task_spam_detection.py |
57 | Run the spam classifier |
task_banger_screen.py |
79 | Run the banger classifier (with topic cache) |
task_post_safety_screen_deluxe.py |
25 | Run the post-safety classifier |
task_safety_ptos_category.py |
61 | Run the PTOS category classifier |
task_safety_ptos_policy.py |
89 | Run the per-category policy classifier |
task_rank_replies.py |
31 | Run the reply scorer |
task_grok_upa_action_with_labels.py |
57 | Apply moderation actions from results |
Total: 1,371 LOC.
task.py (150 lines)
The Task abstract base class and three specialized subclasses (TaskWithPost, TaskWithUser, TaskWithUserContext, TaskWithContentAnalysis).
import logging
import traceback
from abc import ABC, abstractmethod
from enum import Enum
from functools import cache
from tenacity import retry, wait_fixed, stop_after_attempt
from grox.lib.utils import camel_to_snake
from monitor.metrics import Metrics
from grox.schedules.types import TaskContext
from grox.tasks.disable_rules import DisableTaskRule
from grox.data_loaders.data_types import GroxContentAnalysis, Post, User, UserContext
logger = logging.getLogger(__name__)
class TaskResultCategory(str, Enum):
SUCCESS = "success"
SKIPPED = "skipped"
class TaskStopExecution(Exception):
pass
TaskResultCategory is the two-state result enum that plans use for downstream SKIP propagation (Session 19's Plan._execute_task looks for SKIPPED in the dependency results).
TaskStopExecution is the clean-skip exception — any task can raise this to say "I'm not going to run, but this isn't an error." Distinct from a real exception, which triggers retry / error handling.
The Task base — the most-used class in the system
class Task(ABC):
DISABLE_RULES: list[type[DisableTaskRule]] = []
@classmethod
@retry(stop=stop_after_attempt(2), wait=wait_fixed(1))
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
metrics_attributes = {"task_name": cls.get_name()}
Metrics.counter("task.exec.count").add(1, attributes=metrics_attributes)
logger.debug(f"[{cls.get_name()}] starting task")
if cls.should_skip(ctx):
Metrics.counter("task.exec.skipped.count").add(
1, attributes=metrics_attributes
)
logger.info(f"[{cls.get_name()}] skipping task")
return TaskResultCategory.SKIPPED
Metrics.counter("task.exec.intaken.count").add(1, attributes=metrics_attributes)
try:
await cls._exec(ctx)
except TaskStopExecution:
Metrics.counter("task.exec.skipped.count").add(
1, attributes=metrics_attributes
)
logger.info(f"[{cls.get_name()}] skipping task")
return TaskResultCategory.SKIPPED
except Exception:
Metrics.counter("task.exec.failed.count").add(
1, attributes=metrics_attributes
)
logger.error(
f"[{cls.get_name()}] failed to execute task with error: {traceback.format_exc()}"
)
raise
Metrics.counter("task.exec.success.count").add(1, attributes=metrics_attributes)
return TaskResultCategory.SUCCESS
The contract:
- Disable-rule check first. If any
DisableTaskRulesays "skip in this environment," returnSKIPPEDimmediately. _should_skip(ctx)check second. Subclasses can override to skip on payload-dependent conditions (without raising).- Run
_exec(ctx). Three possible outcomes:- Clean return →
SUCCESS. TaskStopExecution→SKIPPED. Used by filters and rate-limiters.- Any other exception → re-raise (counted as
failed).
- Clean return →
The @retry(stop=stop_after_attempt(2), wait=wait_fixed(1)) on exec is 2 total attempts (one initial + one retry after 1s). Tenacity treats stop_after_attempt(2) as "stop after 2 attempts."
Important nuance: the retry wraps exec, not _exec. So if _exec raises a real exception, we retry once. But if a filter raises TaskStopExecution, we don't retry — that's caught inside exec and converted to a normal SKIPPED return.
Each metric counter has task_name attribute (from get_name()), so dashboards can break down by task class.
@classmethod
@abstractmethod
async def _exec(cls, ctx: TaskContext) -> None:
raise NotImplementedError()
@classmethod
def should_skip(cls, ctx: TaskContext) -> bool:
if cls.should_disable(ctx):
return True
return cls._should_skip(ctx)
@classmethod
def _should_skip(cls, ctx: TaskContext) -> bool:
return False
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
for rule in cls.DISABLE_RULES:
if rule.should_disable(ctx):
logger.debug(
f"[{cls.get_name()}] skipping task because {rule.disable_reason()}"
)
return True
return False
@classmethod
@cache
def get_name(cls) -> str:
return camel_to_snake(cls.__name__)
_exec: subclass implements the actual work.should_skip/_should_skip: subclass can override_should_skipto add a payload-dependent skip condition (e.g., "skip if post has no media").should_disable: iteratesDISABLE_RULES, returns True on the first match. Disable rules see the context too — so they can be more than just env-based.get_name: class name in snake_case, cached.
Pattern note: this entire class is @classmethod-only — Task is used as a static dispatcher, not instantiated. Every "Task" is really a class with class methods. Subclasses get state via class attributes (classifier = SpamClassifier(), etc.).
Type-specialized subclasses
class TaskWithUser(Task):
@classmethod
async def _exec(cls, ctx: TaskContext) -> None:
user = ctx.payload.user
if not user:
raise TaskStopExecution("No user for task")
await cls._exec_with_user(ctx, user)
@classmethod
@abstractmethod
async def _exec_with_user(cls, ctx: TaskContext, user: User) -> None:
raise NotImplementedError()
Convenience subclasses that destructure the payload and skip if the expected field is missing. Three concrete forms:
TaskWithUser— expectsctx.payload.user.TaskWithUserContext— expectsctx.payload.user_context.TaskWithPost— expectsctx.payload.post.TaskWithContentAnalysis— expectsctx.payload.grox_content_analysis.
Each follows the same pattern: implement _exec, raise TaskStopExecution on missing field, otherwise call into the typed _exec_with_* hook.
This is the most common base — TaskWithPost covers ~90% of all concrete tasks in the codebase.
disable_rules.py (56 lines)
Env-based task disabling.
from abc import ABC
from grox.config.env import is_dev, is_prod, is_local, is_mm_emb_prod, is_ptos_prod
from grox.schedules.types import TaskContext
class DisableTaskRule(ABC):
DISABLE_REASON: str = ""
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
return False
@classmethod
def disable_reason(cls) -> str | None:
return cls.DISABLE_REASON
class DisableTaskForLocal(DisableTaskRule):
DISABLE_REASON = "Task is disabled for local mode"
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
return is_local
class DisableTaskForDev(DisableTaskRule):
DISABLE_REASON = "Task is disabled for dev mode"
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
return is_dev
class DisableTaskForNonProd(DisableTaskRule):
DISABLE_REASON = "Task is disabled for non-prod mode"
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
return not is_prod
class DisableTaskForNonMmEmbProd(DisableTaskRule):
DISABLE_REASON = "Task is disabled for non-mm-emb-prod mode"
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
return not is_mm_emb_prod
class DisableTaskForNonPtosProd(DisableTaskRule):
DISABLE_REASON = "Task is disabled for non-ptos-prod mode"
@classmethod
def should_disable(cls, ctx: TaskContext) -> bool:
return not is_ptos_prod
Five environment flags. Important ones:
is_local— local dev machine (no Strato calls, no Kafka writes).is_dev— staging environment.is_prod— production (covers all three production environments).is_mm_emb_prod— the production cluster specifically running the multimodal-embedding pipeline.is_ptos_prod— the production cluster specifically running the safety-PTOS pipeline.
A task like TaskPublishKafka would have DISABLE_RULES = [DisableTaskForNonProd] so it doesn't actually write to Kafka in dev or local. The classifier tasks don't typically disable — they're cheap enough to run in dev for testing.
We saw exactly one usage in the files we've read so far: TaskGrokUpaActionWithLabels.DISABLE_RULES = [DisableTaskForNonProd] — applying moderation actions in dev/local would be dangerous (it'd modify real production state via Strato).
The split into mm_emb_prod / ptos_prod reflects the production cluster sharding. The Grox service runs as separate clusters per workload — splitting "make embeddings" from "moderate content" means they can be scaled and rolled out independently. Tasks gate themselves on which cluster they're in.
task_filters.py (370 lines)
The filter tasks decide "is this post eligible for this plan?" and propagate SKIP downstream if not.
Base filter classes
class TaskFilter(Task):
@classmethod
async def _exec(cls, ctx: TaskContext) -> None:
if not await cls._eligible(ctx):
raise TaskStopExecution()
@classmethod
@abstractmethod
async def _eligible(cls, ctx: TaskContext) -> bool:
raise NotImplementedError()
class TaskFilterWithUser(TaskFilter):
@override
@classmethod
async def _eligible(cls, ctx: TaskContext) -> bool:
if not ctx.payload.user:
return False
return await cls._eligible_with_user(ctx.payload.user, ctx)
@classmethod
@abstractmethod
async def _eligible_with_user(cls, user: User, ctx: TaskContext) -> bool:
raise NotImplementedError()
class TaskFilterWithPost(TaskFilter):
@override
@classmethod
async def _eligible(cls, ctx: TaskContext) -> bool:
if not ctx.payload.post:
return False
return await cls._eligible_with_post(ctx.payload.post, ctx)
@classmethod
@abstractmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
raise NotImplementedError()
TaskFilter is the inverse of TaskStopExecution: _eligible returns True (continue) or False (raise the skip exception). Convenience because filters always check a boolean condition; the framework lifts that into the SKIP propagation.
TaskFilterWithUser / TaskFilterWithPost follow the same destructuring pattern as TaskWithUser / TaskWithPost.
TaskSpamFilter — the most-elaborate filter (80 lines)
class TaskSpamFilter(TaskFilterWithPost):
FOLLOWER_COUNT_THRESHOLD_FOR_SPAM_DETECTION = ""
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
if not post.ancestors:
Metrics.counter("task.filter.skipped.count").add(
1, attributes={"filter": "spam_detection", "reason": "not_reply"}
)
return False
FOLLOWER_COUNT_THRESHOLD_FOR_SPAM_DETECTION = "" — empty string (redacted from open-source dump). The actual value is some integer like 1000.
First check: only run spam detection on replies (posts with ancestors). Top-level posts get a different policy (Banger / Post Safety).
if not post.user:
Metrics.counter("task.filter.skipped.count").add(
1, attributes={"filter": "spam_detection", "reason": "no_user"}
)
return False
if post.user.id == 0:
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={"filter": "spam_detection", "reason": "is_system_account"},
)
return False
if post.user.id == 0:
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={"filter": "spam_detection", "reason": "is_system_account"},
)
return False
System accounts (user ID 0) are not spam targets. There's a duplicated check at line 71-82 — the exact same if post.user.id == 0 block twice. Either a typo, or the redaction process accidentally collapsed two different checks into duplicates (the second was probably checking some other system-user condition like a banned admin).
if not post.ancestors[-1].user:
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={
"filter": "spam_detection",
"reason": "previous_post_no_user",
},
)
return False
if post.user.id == post.ancestors[-1].user.id:
logger.info(
f"Skipping reply spam since the replier is same as reply target post {post.id}"
)
Metrics.counter("task.filter.skipped.count").add(
1, attributes={"filter": "spam_detection", "reason": "same_user_reply"}
)
return False
if not post.ancestors[0].user:
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={"filter": "spam_detection", "reason": "root_post_no_user"},
)
return False
if post.user.id == post.ancestors[0].user.id:
logger.info(
f"Skipping reply spam since the replier is same as reply root post {post.id}"
)
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={
"filter": "spam_detection",
"reason": "same_user_reply_as_root",
},
)
return False
Self-reply detection: if the replier is the same as the immediate-parent author OR the root-post author, skip spam detection. Self-replies are conversational continuations, not spam in the targeted-reply sense. (ancestors[-1] is the direct parent; ancestors[0] is the root post of the thread.)
The two ancestor-user-existence checks ("previous_post_no_user" and "root_post_no_user") protect against partial-hydration cases — sometimes the Strato fetch returns posts without user data attached.
in_reply_user_follower_count = post.ancestors[-1].user.follower_count or 0
root_user_follower_count = post.ancestors[0].user.follower_count or 0
if (
in_reply_user_follower_count
> cls.FOLLOWER_COUNT_THRESHOLD_FOR_SPAM_DETECTION
or root_user_follower_count
> cls.FOLLOWER_COUNT_THRESHOLD_FOR_SPAM_DETECTION
):
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={
"filter": "spam_detection",
"reason": "reply_ranking_target",
},
)
return False
return True
The critical decision: only run spam detection if the target (immediate parent or root) is a low-follower account.
This is the complement of reply-ranking: high-follower targets get full reply-ranking (which already weeds out spam-like replies), low-follower targets get focused spam detection. The reason is in the metric attribute: "reply_ranking_target" — skipped because the target is already handled by reply ranking.
The dual-condition > check uses OR — if EITHER the direct parent or root post is high-follower, skip spam (assume reply ranking will handle it).
TaskReplyRankingFilter — the inverse
class TaskReplyRankingFilter(TaskFilterWithPost):
FOLLOWER_COUNT_THRESHOLD_FOR_REPLY_RANKING = ""
Same redacted threshold. Probably the same value as spam-detection's threshold — the two filters partition the input space.
The filter has the same checks as spam (must be a reply, must have user, parent must have user, replier ≠ target/root) — then the follower check is inverted:
in_reply_user_follower_count = post.ancestors[-1].user.follower_count or 0
root_user_follower_count = post.ancestors[0].user.follower_count or 0
if (
in_reply_user_follower_count
<= cls.FOLLOWER_COUNT_THRESHOLD_FOR_REPLY_RANKING
and root_user_follower_count
<= cls.FOLLOWER_COUNT_THRESHOLD_FOR_REPLY_RANKING
):
Metrics.counter("task.filter.skipped.count").add(
1, attributes={"filter": "reply_ranking", "reason": "low_blast_radius"}
)
return False
Skip if both are low-follower (<=, AND). The reasoning: reply ranking is expensive (LLM inference); only worth doing when there's a high-visibility "blast radius" target. The metric reason is exactly that — "low_blast_radius".
Combined: spam runs on low-follower targets, reply-ranking runs on high-follower targets. No overlap.
TaskPostEmbeddingWithSummaryFilter — top-level posts only
class TaskPostEmbeddingWithSummaryFilter(TaskFilterWithPost):
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
if post.ancestors:
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={
"filter": "post_embedding_with_summary",
"reason": "is_reply",
},
)
return False
if not post.user:
...
return False
if post.user.is_protected:
Metrics.counter("task.filter.skipped.count").add(
1,
attributes={
"filter": "post_embedding_with_summary",
"reason": "private_account",
},
)
return False
Metrics.counter("task.post_embedding_with_summary.eligible.count").add(1)
return True
Three checks: not a reply, has user, not from a protected (private) account. The reasoning:
- Replies don't need their own embedding — their parent's embedding covers them in retrieval.
- Protected accounts shouldn't be indexed in the public recommendation graph.
Note the eligible.count counter at the end — only the happy path increments this. Most filters in this file have this pattern: failure paths increment task.filter.skipped.count with a reason attribute; success increments a class-specific counter.
TaskPostEmbeddingWithSummaryForReplyFilter — the inverse
class TaskPostEmbeddingWithSummaryForReplyFilter(TaskFilterWithPost):
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
if not post.ancestors:
... # is_original — skipped
return False
if not post.user:
...
return False
in_reply_user_protected = post.ancestors[-1].user.is_protected
root_user_protected = post.ancestors[0].user.is_protected
if in_reply_user_protected or root_user_protected or post.user.is_protected:
... # private_account — skipped
return False
Metrics.counter(
"task.post_embedding_with_summary_for_reply.eligible.count"
).add(1)
return True
Reply-specific embedding filter: must be a reply (has ancestors), must have user, and all three of replier/parent/root must be public. The reasoning: if any participant in the thread is private, indexing the reply embedding leaks private context.
TaskSafetyPtosFilter — minimal
class TaskSafetyPtosFilter(TaskFilterWithPost):
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
is_deluxe = ctx.payload.task_type == TaskGeneratorType.SAFETY_PTOS_DELUXE
filter_name = (
"safety_ptos_deluxe_detection" if is_deluxe else "safety_ptos_detection"
)
Metrics.counter(f"task.{filter_name}.request.count").add(1)
if not post.user:
Metrics.counter("task.filter.skipped.count").add(
1, attributes={"filter": filter_name, "reason": "no_user"}
)
return False
Metrics.counter(f"task.{filter_name}.eligible.count").add(1)
return True
Safety detection runs on everything with a user. No language filter, no protected-account filter. Replies, originals, private accounts — all get safety classification. (Of course, results from protected accounts don't get distributed to the public; safety must still flag them so internal moderation can act.)
Note the metric prefix swap based on task_type (deluxe vs standard).
TaskPostSafetyDeluxeFilter and TaskInitialBangerFilter
These two are nearly identical:
class TaskPostSafetyDeluxeFilter(TaskFilterWithPost):
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
if not post.user:
...
return False
if post.ancestors:
... # reply — skipped
return False
filter_reason = cls._get_hardcoded_filter_reason(post)
if filter_reason:
...
return False
Metrics.counter("task.post_safety_deluxe.eligible.count").add(1)
return True
@classmethod
def _get_hardcoded_filter_reason(cls, post: Post) -> str | None:
if not post.user:
return None
if post.user.is_protected:
return "private_account"
return None
Top-level posts (no ancestors), with user, not from protected accounts. _get_hardcoded_filter_reason is a hook that subclasses could override with more elaborate filtering — in TaskInitialBangerFilter it's the same implementation:
class TaskInitialBangerFilter(TaskFilterWithPost):
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
Metrics.counter("task.initial_banger_filter.count").add(1)
if not post.user:
return False
if post.ancestors:
...
return False
filter_reason = cls._get_hardcoded_filter_reason(post)
if filter_reason:
...
return False
logger.info(f"Post {post.id} is eligible for initial banger")
Metrics.counter("task.initial_banger_filter.eligible.count").add(1)
return True
@classmethod
def _get_hardcoded_filter_reason(cls, post: Post) -> str | None:
if not post.user:
return None
if post.user.is_protected:
return "private_account"
return None
Two near-duplicate classes that could share a base. They don't — copy-paste with minor differences in metric names. Architectural minor blemish but tolerable.
The _get_hardcoded_filter_reason indirection exists so future filters can add more rules (e.g., "skip posts in language X", "skip posts shorter than N chars") without restructuring the rest of the method. Hasn't been used yet.
task_rate_limit.py (191 lines)
Rate-limit tasks ensure a post isn't processed twice in close succession. Important because the same post can appear on multiple Kafka topics (UNIFIED_POSTS + UNIFIED_POSTS_POPULAR + UNIFIED_POSTS_MIN_TRACTION_FOR_GROX), and we don't want to embed/classify it many times.
Base classes
class TaskRateLimit(Task):
@classmethod
async def _exec(cls, ctx: TaskContext) -> None:
eligible = await cls._eligible(ctx)
Metrics.counter("task.rate_limit.count").add(
1, attributes={"task_name": cls.get_name(), "passed": eligible}
)
if not eligible:
raise TaskStopExecution()
@classmethod
@abstractmethod
def _eligible(cls, ctx: TaskContext) -> Awaitable[bool]:
raise NotImplementedError()
class TaskRateLimitWithPost(TaskRateLimit):
@override
@classmethod
async def _eligible(cls, ctx: TaskContext) -> bool:
if not ctx.payload.post:
return False
return await cls._eligible_with_post(ctx.payload.post, ctx)
@classmethod
@abstractmethod
def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> Awaitable[bool]:
raise NotImplementedError()
class TaskRateLimitWithUser(TaskRateLimit):
@override
@classmethod
async def _eligible(cls, ctx: TaskContext) -> bool:
if not ctx.payload.user:
return False
return await cls._eligible_with_user(ctx.payload.user, ctx)
@classmethod
@abstractmethod
def _eligible_with_user(cls, user: User, ctx: TaskContext) -> Awaitable[bool]:
raise NotImplementedError()
Identical structure to filter classes, but the metric label is passed: bool (rather than a reason). All rate-limit failures are the same reason: "already seen this post."
The _eligible signatures use Awaitable[bool] instead of just bool in the return type — Python's structural-typing for async functions. Slight inconsistency with filters (which use async def), but functionally equivalent.
The eight concrete rate-limiters
All eight follow the exact same template:
class TaskRateLimitEmbeddingWithPostSummary(TaskRateLimitWithPost):
POST_CACHE_FOR_MM_EMB_SUMMARY = TTLCache(maxsize=10_000, ttl=60)
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
post_id = post.id
if post_id not in cls.POST_CACHE_FOR_MM_EMB_SUMMARY:
cls.POST_CACHE_FOR_MM_EMB_SUMMARY[post_id] = True
return True
logger.info(f"Post {post_id} already hit rate limit for mm emb with summary")
return False
- A class-attribute TTLCache (10k entries, 60s TTL).
- Check if the post ID is in the cache.
- If not, add it and return True (run the plan).
- If yes, return False (skip — already processed recently).
Critical detail: this is a first-write-wins cache. The first call within the 60s window goes through; subsequent calls are dropped. Not a token bucket — once you've crossed the rate-limit, the post is locked out for 60s.
The eight rate-limiters are clones of each other, one per plan-flow:
TaskRateLimitEmbeddingWithPostSummary— POST_EMBEDDING_WITH_SUMMARY plan.TaskRateLimitEmbeddingWithPostSummaryForReply— POST_EMBEDDING_WITH_SUMMARY_FOR_REPLY.TaskRateLimitEmbeddingV5— POST_EMBEDDING_V5.TaskRateLimitEmbeddingV5ForReply— POST_EMBEDDING_V5_FOR_REPLY.TaskRateLimitBangerAnnotationWithPost— INITIAL_BANGER.TaskRateLimitReplySpamAnnotationWithPost— SPAM_COMMENT.TaskRateLimitReplyRankingAnnotationWithPost— REPLY_RANKING.TaskRateLimitPostSafetyAnnotationWithPost— POST_SAFETY.
Each has its own dedicated cache so the windows don't interfere. A post can hit banger and reply-ranking independently (in the rare case it's both top-level and a reply, which doesn't happen — but still, the architecture is clean).
TaskRateLimitSafetyPtosAnnotationWithPost — two-cache variant
class TaskRateLimitSafetyPtosAnnotationWithPost(TaskRateLimitWithPost):
POST_CACHE_FOR_SAFETY_PTOS = TTLCache(maxsize=10_000, ttl=60)
POST_CACHE_FOR_SAFETY_PTOS_DELUXE = TTLCache(maxsize=10_000, ttl=60)
@override
@classmethod
async def _eligible_with_post(cls, post: Post, ctx: TaskContext) -> bool:
post_id = post.id
is_deluxe = ctx.payload.task_type == TaskGeneratorType.SAFETY_PTOS_DELUXE
cache = (
cls.POST_CACHE_FOR_SAFETY_PTOS_DELUXE
if is_deluxe
else cls.POST_CACHE_FOR_SAFETY_PTOS
)
label = "safety ptos deluxe" if is_deluxe else "safety ptos"
if post_id not in cache:
cache[post_id] = True
return True
logger.info(f"Post {post_id} already hit rate limit for {label}")
return False
PTOS has two cache namespaces (standard vs deluxe) because the same post might legitimately be processed by both — they have different upstream Kafka topics with different selection criteria (deluxe gets more thorough analysis).
Architectural note on this design
The caches are in-process, per-worker. Multiple Grox engine processes don't share these caches. So if you scale up to 4 workers, a post arriving simultaneously across the dispatcher's 4 workers could pass the rate limit 4 times.
In practice this is acceptable:
- The dispatcher's
PriorityTaskGenerator(Session 18) distributes tasks pseudo-randomly across workers, so duplicate arrivals are rare. - The upstream Kafka topic itself has consumer-group semantics — only one worker reads a given message.
- The downstream Manhattan writes are idempotent (key by post ID), so even if we somehow processed twice, the final state would be the same.
The rate-limit task is a cost reduction, not a correctness mechanism.
task_media.py (21 lines)
from grox.tasks.task import TaskWithPost
from grox.schedules.types import TaskContext
from grox.data_loaders.data_types import Post
from grox.data_loaders.media_processor import MediaProcessor
from monitor.metrics import Metrics
class TaskMediaHydration(TaskWithPost):
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
Metrics.counter("task.media_hydration.total.count").add(1)
await MediaProcessor.process(post, video_duration_limit_minutes=360)
Metrics.counter("task.media_hydration.passed.count").add(1)
class TaskMediaHydrationBanger(TaskWithPost):
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
Metrics.counter("task.media_hydration_banger.total.count").add(1)
await MediaProcessor.process(post, video_duration_limit_minutes=360)
Metrics.counter("task.media_hydration_banger.passed.count").add(1)
Two near-duplicate classes. Both call MediaProcessor.process(post, video_duration_limit_minutes=360) — same args, different metric names. The duplication exists only so banger and non-banger can be monitored independently.
MediaProcessor is a separate-process media pipeline (mentioned in engine.py start, but its file isn't in the open-source dump). It downloads images/videos, decodes them, attaches the bytes to the post object (post.media[i].convo_image.content = <bytes>).
video_duration_limit_minutes=360 = 6 hours. Videos longer than this get their first 6 hours processed only — keeping memory bounded.
The class-method is async; MediaProcessor.process internally fans out to thread workers, similar to the ASR processor we saw in Session 19. The await returns once all media is hydrated.
task_asr.py (74 lines)
import logging
from grox.data_loaders.asr_processor import ASRProcessor
from grox.data_loaders.data_types import Post, Video
from grox.schedules.types import TaskContext
from grox.tasks.task import TaskWithPost
from monitor.metrics import Metrics
logger = logging.getLogger(__name__)
def _get_video_url(video: Video) -> tuple[str | None, bool]:
if video.animatedGifInfo:
v = video.animatedGifInfo.get_best_variant()
if v and v.url:
return v.url, True
if video.videoInfo:
v = video.videoInfo.get_best_variant()
if v and v.url:
return v.url, False
if video.url:
return video.url, False
return None, False
Pick a video URL with best-variant logic (lowest bitrate that still meets quality). Returns (url, is_animated_gif). The flag matters because animated GIFs have no audio — we'll skip ASR on those.
class TaskASRTranscription(TaskWithPost):
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
Metrics.counter("task.asr_transcription.total.count").add(1)
if not post.media:
logger.debug(f"Post {post.id} has no media, skipping ASR")
Metrics.counter("task.asr_transcription.skipped.count").add(
1, attributes={"reason": "no_media"}
)
return
videos = [m for m in post.media if isinstance(m, Video)]
if not videos:
logger.debug(f"Post {post.id} has no video, skipping ASR")
Metrics.counter("task.asr_transcription.skipped.count").add(
1, attributes={"reason": "no_video"}
)
return
Metrics.counter("task.asr_transcription.has_video.count").add(1)
Three early returns. Each emits a different skip reason for monitoring.
Note: return here doesn't propagate SKIPPED to plan dependencies — it returns SUCCESS from the task framework's perspective. ASR is optional; "no video to transcribe" is success, not skip. Downstream tasks (like the V5 embedder) check video.convo_video.asr_transcript themselves.
for video in videos:
video_url, is_animated_gif = _get_video_url(video)
if not video_url:
continue
if is_animated_gif:
logger.debug(
f"Post {post.id} video {video.id} is an animated GIF, skipping ASR (no audio)"
)
Metrics.counter("task.asr_transcription.skipped.count").add(
1, attributes={"reason": "animated_gif"}
)
continue
transcript = await ASRProcessor.process(post.id, video_url)
if transcript:
if video.convo_video is not None:
video.convo_video.asr_transcript = transcript
Metrics.counter("task.asr_transcription.success.count").add(1)
logger.info(
f"ASR completed for post {post.id} video {video.id}, transcript_len={len(transcript)}"
)
else:
Metrics.counter("task.asr_transcription.failed.count").add(
1, attributes={"reason": "processor_error"}
)
Iterate through every video in the post. For each: get URL, skip if animated GIF, call ASRProcessor (which we walked through in Session 19), stash the transcript on video.convo_video.asr_transcript if successful.
The asr_transcript slot is what MultimodalPostEmbedderV5.embed() reads. So the V5 pipeline order is: media hydrate → ASR transcript → V5 embedding — each task mutates the post in place; downstream tasks pick up the additions.
Note: await ASRProcessor.process(post.id, video_url) is per-video. Multiple videos in one post → serial ASR. Could be parallel via asyncio.gather, but most posts have 1 video; the optimization isn't worth the complexity.
task_load_post_with_not_found_retry.py (35 lines)
import time
from grox.tasks.task import Task, TaskWithPost, TaskResultCategory
from grox.schedules.types import TaskContext
from grox.data_loaders.data_types import Post
from grox.data_loaders.strato_loader import TweetStratoLoader
from grox.schedules.types import TaskPayload
from grox.tasks.task import TaskStopExecution
from monitor.metrics import Metrics
from tenacity import retry, wait_chain, wait_fixed, stop_after_attempt
class TaskLoadPostWithNotFoundRetry(TaskWithPost):
@classmethod
@retry(stop=stop_after_attempt(3), wait=wait_chain(wait_fixed(1), wait_fixed(1)))
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
return await Task.exec.__wrapped__(cls, ctx)
Override exec to use a custom retry policy: 3 attempts with wait_chain(wait_fixed(1), wait_fixed(1)) — wait 1s after the first failure, 1s after the second. (The chain provides waits for the first N retries; subsequent retries reuse the last wait.)
Task.exec.__wrapped__ accesses the undecorated exec method (because the base class wraps exec with its own @retry). Without this trick, you'd be stacking two retry decorators (base = 2 attempts × this = 3 attempts = 6 total — accidentally exponential). The __wrapped__ attribute is set by functools.wraps (which tenacity uses internally).
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
start_time = time.perf_counter_ns()
loaded_post = await TweetStratoLoader.load_post(post.id)
if loaded_post is None:
task_type = (
ctx.payload.task_type.value
if ctx.payload and ctx.payload.task_type
else "None"
)
if "recovery" in task_type:
raise TaskStopExecution(f"Post not found: {post.id}")
else:
raise ValueError(f"Post not found: {post.id}")
ctx.payload.post = loaded_post
duration_ms = (time.perf_counter_ns() - start_time) / 1_000
Metrics.histogram("task.embedding_load_post.duration_ms").record(duration_ms)
Re-fetch the post from Strato (the Kafka loader gives us a shallow Post — this task fetches the full version with quote metadata, ancestors, etc.).
Behavior split based on task_type:
- If the task_type contains "recovery" → not-found is a SKIP. The post was deleted between when Kafka emitted the message and now; we don't need to do anything.
- Otherwise → not-found is an ERROR. We raise ValueError, the retry mechanism kicks in (3 attempts), and if it still fails we propagate up.
Why the split? Recovery topics are designed for catch-up — they re-replay older messages. By the time we get to a recovery message, the post may have been deleted. Skip cleanly. The main streams should see fresh posts; not-found there is anomalous.
Note duration_ms = ... / 1_000 — converts nanoseconds to microseconds, not milliseconds. The metric name is _ms but the value is µs. A bug, or just confusing naming.
task_load_post_with_summary.py (27 lines)
class TaskLoadPostWithSummary(TaskWithPost):
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
loaded_post = await TweetStratoLoader.load_post(post.id)
if loaded_post is None:
raise TaskStopExecution(f"Post not found: {post.id}")
stratoPostMultimodalEmbeddingGrokSummaryMh = (
StratoPostMultimodalEmbeddingGrokSummaryMh()
)
summary = await stratoPostMultimodalEmbeddingGrokSummaryMh.fetch(
int(post.id), "v3"
)
if summary is None:
raise TaskStopExecution(f"Summary not found: {post.id}")
loaded_post.summary = summary
ctx.payload.post = loaded_post
Similar to the above but also fetches the pre-computed summary from Strato. Used by paths that need both the post and its summary loaded together (e.g., embedding tasks that take a summary as input).
fetch(int(post.id), "v3") — the "v3" is a summary-version key, since multiple summary versions can coexist in the KV store.
Both not-found cases (post or summary) are TaskStopExecution — clean skip, not retry. This task is used in flows where the upstream summary computation may not have completed yet; skipping is the right behavior.
task_load_user_recent_posts.py (23 lines)
class TaskLoadUserRecentPosts(TaskWithPost):
RECENT_POSTS_LIMIT = ""
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
if not post.user or not post.user.id:
raise TaskStopExecution("Post has no author user to load recent posts for")
recent_posts = await UserRecentPostsLoader.load(
post.user.id, limit=cls.RECENT_POSTS_LIMIT
)
post.user.recent_posts = recent_posts
logger.info(f"Loaded {len(recent_posts)} recent posts for user {post.user.id}")
Hydrate the post's author with their recent posts (from UserRecentPostsLoader, Session 19). RECENT_POSTS_LIMIT = "" — redacted integer constant, probably 10 or 20.
Skip if there's no author user. Result is attached to post.user.recent_posts — downstream tasks (like spam detection) can use this for user-history context.
task_summarizer_for_post_embedding.py (25 lines)
class TaskPostEmbeddingSummarizer(TaskWithPost):
summarizer = PostEmbeddingSummarizer(prompt_file="")
@classmethod
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
return await Task.exec.__wrapped__(cls, ctx)
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
res = await cls.summarizer.summarize(post)
assert res is not None
post.summary = res
Metrics.counter("task.post_embedding_summarizer.count").add(1)
summarizer = PostEmbeddingSummarizer(prompt_file="") — prompt_file is redacted; in production it's a path to the actual summarization prompt.
The summarizer class is instantiated at class definition time (one per process), so the VLM client connection pool is established before any task runs.
The exec override disables retries: Task.exec.__wrapped__ skips the base @retry, and no new retry is added on top. So summarization is one-shot — if it fails, the plan retry takes over (Session 19's plan-level error handling).
The assert res is not None — summarizer can technically return None for empty input, but for valid posts it should always return a string. This assert is a programming-error guard.
task_spam_detection.py (57 lines)
class TaskSpamDetection(TaskWithPost):
eapi_low_follower_classifier = SpamEapiLowFollowerClassifier()
@classmethod
def get_follower_bucket_string(cls, post: Post) -> str:
if not post.ancestors:
return "invalid"
in_reply_user_follower_count = post.ancestors[-1].user.follower_count or 0
root_user_follower_count = post.ancestors[0].user.follower_count or 0
if in_reply_user_follower_count <= 100 and root_user_follower_count <= 100:
return "lte_100"
elif in_reply_user_follower_count <= 500 and root_user_follower_count <= 500:
return "lte_500"
elif in_reply_user_follower_count <= 1000 and root_user_follower_count <= 1000:
return "lte_1000"
else:
return "gt_1000"
Discretize the follower count into 4 buckets: lte_100, lte_500, lte_1000, gt_1000. Used for per-bucket success/positive-rate metrics.
The "AND both ≤ N" logic puts a post in the smallest bucket that contains both the immediate-parent and root authors. So lte_100 is the strictest — both must be very low follower.
@classmethod
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
return await Task.exec.__wrapped__(cls, ctx)
No retries — LLM classification calls are expensive and have their own server-side retries.
@override
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
res = await cls.eapi_low_follower_classifier.classify(post)
ctx.content_categories.extend(res)
passed = any(
r.positive for r in res if r.category == ContentCategoryType.SPAM_COMMENT
)
follower_bucket_string = cls.get_follower_bucket_string(post)
if passed and follower_bucket_string != "gt_1000":
logger.info(
f"Reply Spam Found for lower than 1000 follower bucket. The post_id is {post.id} and the follower bucket is {follower_bucket_string}"
)
if passed:
Metrics.counter("task.spam_comment_detection.positive.count").add(
1, attributes={"reason": follower_bucket_string}
)
else:
Metrics.counter("task.spam_comment_detection.negative.count").add(
1, attributes={"reason": follower_bucket_string}
)
Run the classifier, append results to context's content_categories list. Record per-bucket positive/negative counters.
Note if passed and follower_bucket_string != "gt_1000": only log spam findings on the smaller buckets. The reasoning: spam in lte_100/500/1000 is expected (low-follower posts attract more spam replies); spam in gt_1000 is unusual but its high-volume makes per-post logging noisy. So the smaller buckets get a dedicated info log per finding.
task_banger_screen.py (79 lines)
The most-elaborate classifier wrapper — manages an in-process topic cache.
class TaskBangerScreen(TaskWithPost):
classifier = BangerInitialScreenClassifier()
_cached_topics = None
_cache_timestamp = None
@classmethod
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
return await Task.exec.__wrapped__(cls, ctx)
Class-level cache of topics. None initially; populated on first call.
@override
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
Metrics.counter("task.banger_initial_screen.total.count").add(1)
if cls._cached_topics is None or (
cls._cache_timestamp is not None
and time.time() - cls._cache_timestamp > CACHE_TTL_SECONDS
):
logger.info("Fetching grok topics for cache")
Metrics.counter("task.banger_initial_screen.grok_cache.new_load.count").add(
1
)
query = StratoGrokTopics()
fetched_topics = await query.fetch()
if fetched_topics:
cls._cached_topics = fetched_topics
cls._cache_timestamp = time.time()
logger.info(f"Cached {len(cls._cached_topics)} categories with topics")
Metrics.counter(
"task.banger_initial_screen.grok_cache.new_load.success.count"
).add(1)
else:
logger.warning("Failed to fetch grok topics")
Metrics.counter(
"task.banger_initial_screen.grok_cache.new_load.failure.count"
).add(1)
else:
Metrics.counter(
"task.banger_initial_screen.grok_cache.cache_hit.count"
).add(1)
The topic cache logic: CACHE_TTL_SECONDS = 3600 (1 hour). Reload from Strato every hour. On reload failure, keep using the stale cache (don't reset to None).
The topics list is the taxonomy of content categories — names like "Sports", "Politics", "Cooking", etc. — fetched from Strato so editors can update the taxonomy without redeploying Grox. The classifier prompt template (Session 20's BangerMiniVlmScreenScore) takes this topics list as a params={"topics": topics} argument.
if cls._cached_topics and len(cls._cached_topics) > 0:
Metrics.counter("task.banger_initial_screen.with_cached_topics.count").add(
1
)
else:
Metrics.counter(
"task.banger_initial_screen.without_cached_topics.count"
).add(1)
res = await cls.classifier.classify(post, topics=cls._cached_topics)
ctx.content_categories.extend(res)
ctx.available_topics = cls._cached_topics
passed = any(
r.positive
for r in res
if r.category == ContentCategoryType.BANGER_INITIAL_SCREEN
)
if passed:
Metrics.counter("task.banger_initial_screen.passed.count").add(1)
else:
Metrics.counter("task.banger_initial_screen.failed.count").add(1)
Track whether each call had cached topics or not. Call the classifier with topics. Stash the result on the context (both content_categories and available_topics). Counter the pass/fail.
ctx.available_topics = cls._cached_topics — exposing the topic list to downstream tasks. The Grok UPA task (next section) doesn't use it, but other tasks (like topic-relevance scoring) might.
task_post_safety_screen_deluxe.py (25 lines)
class TaskPostSafetyScreenDeluxe(TaskWithPost):
classifier = PostSafetyDeluxeClassifier()
@classmethod
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
return await Task.exec.__wrapped__(cls, ctx)
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
Metrics.counter("task.post_safety_screen_deluxe.total.count").add(1)
res = await cls.classifier.classify(post)
ctx.content_categories.extend(res)
Minimal wrapper. Run the classifier, append results. No special handling. Counts request volume only — pass/fail is derived from content_categories downstream.
task_safety_ptos_category.py (61 lines)
class TaskSafetyPtosCategoryDetection(TaskWithPost):
classifier = SafetyPtosCategoryClassifier(ModelName.VLM_SAFETY)
deluxe_classifier = SafetyPtosCategoryClassifier(
ModelName.VLM_PRIMARY_CRITICAL, deluxe=True
)
Two classifiers at the class level — standard and deluxe — instantiated up-front.
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
is_deluxe = ctx.payload.task_type == TaskGeneratorType.SAFETY_PTOS_DELUXE
active_classifier = cls.deluxe_classifier if is_deluxe else cls.classifier
metric_prefix = (
"task.safety_ptos_deluxe_category"
if is_deluxe
else "task.safety_ptos_category"
)
safety_annotations = await active_classifier.classify_post(post)
ctx.safety_annotations = safety_annotations
Pick standard vs deluxe by task_type. classify_post returns the full SafetyPostAnnotations (with the violated-policies list, not just a binary). Stash on the context for the policy detection task to pick up.
safety_categories = safety_annotations.violatedPolicies or []
violation_count = len(safety_categories)
has_violations = violation_count > 0
Metrics.counter(f"{metric_prefix}.classified.count").add(1)
if has_violations:
Metrics.counter(f"{metric_prefix}.has_violations.count").add(1)
Metrics.counter(f"{metric_prefix}.violations.count").add(violation_count)
violation_details = []
for violation in safety_categories:
Metrics.counter(f"{metric_prefix}.violations_by_category.count").add(
1, attributes={"category": violation.category.value}
)
violation_details.append(
f"{violation.category.value}({violation.score})"
)
mode = " (deluxe)" if is_deluxe else ""
logger.info(
f"Post {post.id}: Found {violation_count} violations{mode} - Details: {', '.join(violation_details)}"
)
else:
Metrics.counter(f"{metric_prefix}.no_violations.count").add(1)
mode = " (deluxe)" if is_deluxe else ""
logger.info(f"Post {post.id}: No safety violations detected{mode}")
Rich metric instrumentation:
- classified.count — total classifications.
- has_violations.count / no_violations.count — coarse binary.
- violations.count — total violations across all categories (a counter incremented by the count, useful for "average violations per post").
- violations_by_category.count — per-category breakdown.
Plus an info log for every classification, with the category list inline. Heavy logging for safety because audit is a hard requirement.
task_safety_ptos_policy.py (89 lines)
class TaskSafetyPtosPolicyDetection(TaskWithPost):
violated_policy_classifier = SafetyPtosPolicyClassifier()
deluxe_violated_policy_classifier = SafetyPtosPolicyClassifier(deluxe=True)
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
if not ctx.safety_annotations:
return
Skip if the category classifier produced no annotations (e.g., post had no violations to classify policies for).
is_deluxe = ctx.payload.task_type == TaskGeneratorType.SAFETY_PTOS_DELUXE
active_classifier = (
cls.deluxe_violated_policy_classifier
if is_deluxe
else cls.violated_policy_classifier
)
metric_prefix = (
"task.safety_ptos_deluxe_policy" if is_deluxe else "task.safety_ptos_policy"
)
violations = list(ctx.safety_annotations.violatedPolicies or [])
injected_recheck = None
if is_deluxe:
if not any(
v.category == SafetyPolicyCategory.AdultContent for v in violations
):
injected_recheck = SafetyPtosViolatedPolicy(
category=SafetyPolicyCategory.AdultContent,
reason="adult content recheck",
score=50,
)
violations.append(injected_recheck)
Adult-content recheck injection in deluxe mode: if the category classifier didn't flag adult content, inject a fake violation for it anyway. This forces the policy classifier to double-check for adult content even when the cheap category classifier missed it.
Score=50 placeholder (mid-range). The actual policy classifier verdict overrides this.
This is a safety-leaning bias: prefer false positives (extra checks that say "no") over false negatives (missed adult content). Only applied in deluxe mode because the extra LLM call costs money.
for violation in violations:
violation.safetyPolicy = (
await active_classifier.classify_policy_for_violation(post, violation)
)
if violation.safetyPolicy:
cls._record_policy_metrics(metric_prefix, violation)
if injected_recheck is not None:
policy = injected_recheck.safetyPolicy
if not policy or policy.policyType == SafetyPolicyType.NoViolation:
violations.remove(injected_recheck)
ctx.safety_annotations.violatedPolicies = violations
For each violation, classify the policy (Session 20's classify_policy_for_violation). Attach the result to the violation. Record per-violation metrics.
Post-hoc remove the injected recheck if it came back with no actual violation. So the injection is invisible if it was a false alarm; only kept if the deluxe model confirmed adult content the cheap model missed.
@classmethod
def _record_policy_metrics(
cls, metric_prefix: str, violation: SafetyPtosViolatedPolicy
) -> None:
Metrics.counter(f"{metric_prefix}.classified_total.count").add(1)
category_key = {
SafetyPolicyCategory.ViolentMedia: "violent_media",
SafetyPolicyCategory.AdultContent: "adult_content",
SafetyPolicyCategory.Spam: "spam",
SafetyPolicyCategory.IllegalAndRegulatedBehaviors: "illegal_and_regulated_behaviors",
SafetyPolicyCategory.HateOrAbuse: "hate_or_abuse",
SafetyPolicyCategory.ViolentSpeech: "violent_speech",
SafetyPolicyCategory.SuicideOrSelfHarm: "suicide_or_self_harm",
}.get(violation.category)
if category_key:
Metrics.counter(
f"{metric_prefix}.classified_{category_key}_violations.count"
).add(1)
Metrics.counter(
f"{metric_prefix}.classified_{category_key}_policy_types.count"
).add(1, attributes={"policy_type": violation.safetyPolicy.policyType.name})
Translate enum to lowercase string, emit two counters per category: one for "we classified this category at all," one labeled by the policy-type the classifier returned (e.g., NoViolation, Hide, Restrict, etc.).
Note: counters per category as separate metric names (not attributes) — classified_adult_content_violations.count vs classified_violent_media_violations.count. This gives dashboards finer-grained access patterns at the cost of metric cardinality. The trade-off here favors observability.
task_rank_replies.py (31 lines)
class TaskRankReplies(TaskWithPost):
scorer = ReplyScorer()
@classmethod
async def exec(cls, ctx: TaskContext) -> TaskResultCategory:
return await Task.exec.__wrapped__(cls, ctx)
@classmethod
async def _exec_with_post(cls, ctx: TaskContext, post: Post) -> None:
user = post.user
logger.info(
f"[task_rank_replies] {post.id=} "
f"is_pasted={post.is_pasted} "
f"user_agent={post.user_agent!r} "
f"composition_source={post.composition_source!r} "
f"app_attestation_status={post.app_attestation_status!r} "
f"has_risky_user_safety_label={user.has_risky_user_safety_label if user else None} "
f"num_legit_blocks_received_last_24hrs={user.num_legit_blocks_received_last_24hrs if user else None}"
)
res = await cls.scorer.score(post)
ctx.reply_ranking_results.extend(res)
Tiny task body but very revealing log line: this shows what features the reply ranker considers important enough to log per-call:
is_pasted— was the post pasted from clipboard?user_agent— the client used to post.composition_source— first-party, third-party, scheduled, etc.app_attestation_status— has the client app been attested as legit?has_risky_user_safety_label— is the user already flagged as risky?num_legit_blocks_received_last_24hrs— recent block velocity from non-spam users.
These are anti-spam signals that complement the LLM's content reasoning. The reply ranker uses both content (via the LLM) and these meta-signals (via ThreadRenderer.render(post, include_signals=True) from Session 20).
Results go into ctx.reply_ranking_results (a list — though typically just one entry per call).
task_grok_upa_action_with_labels.py (57 lines)
The action-taking task. After all classifications are done, this task applies moderation actions based on the cumulative results.
class TaskGrokUpaActionWithLabels(Task):
DISABLE_RULES = [DisableTaskForNonProd]
_strato_grok_upa_action_with_labels = StratoGrokUpaActionWithLabels()
Disabled in non-prod environments — this task mutates production safety state via Strato. Dev would never run this.
Class-level Strato client (instance, not class — but used as a singleton).
@classmethod
async def _exec(cls, ctx: TaskContext) -> None:
Metrics.counter("task.grok_upa_action_with_labels.count").add(1)
post = ctx.payload.post
if not post:
return
results = ctx.content_categories
if not results:
return
grok_response = next((r for r in results if r.tweet_bool_metadata), None)
if not grok_response or not grok_response.tweet_bool_metadata:
return
Three early returns:
- No post → bail.
- No content categories produced → nothing to act on.
- None of the categories have
tweet_bool_metadata(the booleans likecontains_violence) → bail.
Note: This is a Task (not TaskWithPost) — it intentionally handles the post is None case rather than raising TaskStopExecution. The reason: this task runs after multiple other tasks; even if some upstream task didn't produce results, we shouldn't propagate a SKIP downstream (the publish task that depends on us still wants to run if there were any results).
tweet_id = int(post.id)
tweet_bool_metadata = grok_response.tweet_bool_metadata.model_dump()
action_result = await cls._strato_grok_upa_action_with_labels.execute(
tweet_id, tweet_bool_metadata
)
Send the tweet ID + the bool metadata to Strato. Strato's grokUpaActionWithLabels is a server-side decision-tree that takes the booleans and decides which labels to apply to the post (e.g., "Restricted", "Adult Content Warning", "Hidden from Recommendations").
The labels are then applied to the post in Manhattan / the X internal moderation system.
if action_result and len(action_result.applied_labels) > 0:
logger.info(
f"grokUpaActionWithLabels applied labels: debugString='{action_result.debug_string}', appliedLabels={action_result.applied_labels} for post {tweet_id}"
)
Metrics.counter("task.grok_upa_action_with_labels.applied.count").add(1)
for label in action_result.applied_labels:
Metrics.counter(
"task.grok_upa_action_with_labels.applied_label.count"
).add(1, attributes={"label": label})
elif action_result:
logger.info(
f"grokUpaActionWithLabels no labels applied: debugString='{action_result.debug_string}' for post {tweet_id}"
)
Metrics.counter("task.grok_upa_action_with_labels.empty.count").add(1)
else:
logger.info(f"grokUpaActionWithLabels failed for post {tweet_id}")
Metrics.counter("task.grok_upa_action_with_labels.failed.count").add(1)
Three outcomes:
- Labels applied → log + per-label counter. This is the audit log: when did we apply label X to post Y, what was the reasoning.
- No labels applied (decision tree found nothing matched) →
empty.count. - Strato call failed →
failed.count.
The debug_string is server-side reasoning about why labels were/weren't applied — invaluable for moderation audits.
This is the moment the Grox pipeline crosses from "make predictions" to "take actions". Everything upstream is observational — read post, classify, write to KV. This task actually mutates moderation state via a downstream RPC that gates downstream content distribution.
Summary
The task layer is where plan declarations meet concrete service calls. The shape is consistent:
- Base classes (
task.py) provide retry semantics, metric instrumentation, and payload destructuring conveniences. - Filter tasks decide eligibility (raise
TaskStopExecutionto SKIP). - Rate-limit tasks dedupe per-post within a 60s in-process TTL cache.
- Hydration tasks (media, ASR, post reload) populate the post object in place.
- Classifier wrapper tasks call the corresponding
ContentClassifier(Session 20) and append results toctx.content_categories. - Action tasks read the cumulative classification results and trigger external side effects (Strato writes that apply moderation labels).
Key design points:
- Class methods everywhere: tasks are not instantiated; class attributes hold long-lived state (classifiers, caches, Strato clients).
Task.exec.__wrapped__trick: tasks that want a different retry policy use__wrapped__to bypass the base class's@retry(2 attempts)and apply their own.- TaskStopExecution vs other exceptions: clean propagated SKIPs vs error-retryable failures. The plan executor (Session 19) reads this distinction to skip downstream tasks vs to fail the whole plan.
- Disable rules layer: env-gated tasks. Production-only tasks (
TaskGrokUpaActionWithLabels) skip in dev/local without breaking the rest of the plan. - In-process state: rate-limit caches, topic cache, classifier instances — all class-attribute singletons. Trades cross-process consistency for speed; the downstream sinks are idempotent so double-writes don't corrupt anything.
The split between content-category writes (Session 22's pubs/sinks) and action triggers (this session's UPA task) is deliberate. The pubs/sinks write the observations; the UPA task interprets observations into actions. You can evolve the action policy in Strato without redeploying Grox.
Next session — the finale
Session 22 — Grox tasks part 2 (~1,255 LOC): the publishing layer.
task_pub.py(555 LOC) — the kitchen sink of Kafka/Manhattan publishers.TaskPublishKafka,TaskPublishUnifiedPostAnnotationsManhattan,TaskWriteReplySpamManhattan, etc.task_embedding_pub.py(79 LOC) — embedding-specific publishes.task_multimodal_post_embedding.py(86 LOC) — runs the V2/V3 and V5 embedders.task_write_mm_embedding_sink.py(193 LOC) — embedding-sink writes (Manhattan + optional Kafka).task_write_safety_post_annotations_result_sink.py(342 LOC) — safety annotation writes (the most complex sink, with policy-type fan-out).
After Session 22, the line-by-line analysis is complete: 22 sessions, ~24,914 LOC across Rust home-mixer, Thunder, candidate-pipeline, Phoenix ML, and Grox.