How to Build a Share-of-Model Analytics Pipeline (From Scratch)
Traditional rank tracking is dead. Learn how to architect a deterministic 'Share of Model' pipeline using async Python probing, structured LLM evaluation, and time-series analysis.
The Deterministic Fallacy
We are accustomed to deterministic rank tracking. For two decades, if you searched for "best ci/cd pipelines" on Google from a specific IP in San Francisco, the result was a static DOM. We scraped it, parsed the div.g elements, and logged a rank of 3. Simple.
AI Search (SearchGPT, Perplexity, Gemini) has dismantled this predictability. These systems are non-deterministic, context-aware, and fundamentally probabilistic. If you ask Perplexity about your product three times, you might get three slightly different answers based on the seed, temperature, and RAG (Retrieval-Augmented Generation) latency.
This presents a massive observability gap for engineering and growth teams. We cannot "rank" a URL in a generated paragraph. We cannot "scrape" a dynamic stream effectively without headless overhead.
To solve this, we moved from Rank Tracking to Share of Model (SoM) measurement. We treat the LLM as a black-box function $f(x)$ and apply Monte Carlo-style probing to determine the probability of our entity appearing in the output.
Here is how we architected a pipeline to measure AI visibility programmatically, using Python, the Instructor library, and ClickHouse.
Architecture: The "Probe-Judge" Pattern
We faced two core challenges when building this visibility engine: 1. Variance: A single query is statistically insignificant. 2. Unstructured Output: The response is natural language, not a structured list.
To address this, we implemented the Probe-Judge Architecture.
1. The Probing Layer This service acts as a synthetic user. It manages a queue of "Golden Queries" (high-value keywords) and dispatches them to various model endpoints (OpenAI for ChatGPT simulation, Perplexity API, Anthropic, etc.).
Crucially, we do not send a query once. We send it $N$ times (usually $N=5$) with varying temperature settings (0.1 to 0.7) to simulate the variance in user sessions and model instability.
2. The Judgment Layer We cannot use Regex to find our brand. If our product is "AcmeDB" and the LLM writes "Acme's new database," a strict string match might fail or miss context (e.g., negative sentiment).
We utilize an LLM-as-a-Judge. We feed the output of the Target LLM (e.g., Perplexity) into a stronger, strictly typed Evaluator LLM (e.g., GPT-4o) to extract structured metrics: Mention Presence, Sentiment Score, and Citation Rank.
3. The Aggregation Layer We store these structured evaluations in a time-series database to calculate a moving average of visibility—our "Share of Model."
Implementation Phase 1: The Asynchronous Probe We need high throughput. Waiting for a Perplexity response (which involves a web search) can take 5-10 seconds. Doing this synchronously for 1,000 keywords is non-viable.
We utilize Python's asyncio and httpx to build a concurrent dispatcher. We also implement a semantic caching layer manually (or via Redis) to prevent re-querying identical prompts within a 24-hour window, saving significant token costs.
Here is the core dispatcher logic that handles the multi-model probing:
import asyncio import httpx from typing import List, Dict from tenacity import retry, stop_after_attempt, wait_exponential
# Configuration TARGET_MODELS = ["perplexity/sonar-reasoning-pro", "openai/gpt-4o", "google/gemini-1.5-pro"] API_KEYS = {"perplexity": "pplx-...", "openai": "sk-..."}
class ModelProbe: def __init__(self): self.client = httpx.AsyncClient(timeout=30.0)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def query_model(self, model: str, prompt: str, run_id: str) -> Dict: """ Dispatches a prompt to a specific LLM API and captures metadata. """ provider = model.split("/")[0]
# Payload construction varies by provider (simplified for brevity) if provider == "perplexity": url = "https://api.perplexity.ai/chat/completions" headers = {"Authorization": f"Bearer {API_KEYS['perplexity']}"} payload = { "model": model.split("/")[1], "messages": [{"role": "user", "content": prompt}] } # ... handlers for OpenAI, Anthropic, etc.
response = await self.client.post(url, json=payload, headers=headers) response.raise_for_status()
data = response.json() return { "run_id": run_id, "model": model, "prompt": prompt, "raw_response": data["choices"][0]["message"]["content"], "citations": data.get("citations", []), # Specific to Perplexity/SearchGPT "latency_ms": response.elapsed.total_seconds() * 1000 }
async def batch_probe(prompts: List[str]): probe = ModelProbe() tasks = []
# Monte Carlo Approach: 3 runs per prompt per model for prompt in prompts: for model in TARGET_MODELS: for i in range(3): run_id = f"{model}-{hash(prompt)}-{i}" tasks.append(probe.query_model(model, prompt, run_id))
results = await asyncio.gather(*tasks, return_exceptions=True) return [r for r in results if not isinstance(r, Exception)]
Implementation Phase 2: The Semantic Judge Once we have the raw text from Perplexity or Gemini, we need to convert it into data.
We use the instructor library (built on Pydantic) to force the Evaluator LLM to output a JSON object adhering to a strict schema. This is critical. Without schema enforcement, the evaluator might output "Yes, AcmeDB is mentioned," which creates a parsing nightmare downstream.
We define a BrandVisibility schema that captures nuances like "Sentiment" (is the LLM recommending us or warning against us?) and "Competitors" (who else is listed?).
import instructor from pydantic import BaseModel, Field from openai import OpenAI from typing import List, Optional
# Define the extraction schema class CompetitorMention(BaseModel): name: str sentiment_score: float = Field(..., description="Float from -1.0 (Negative) to 1.0 (Positive)") is_primary_recommendation: bool
class VisibilityAudit(BaseModel): contains_target_brand: bool = Field(..., description="Is the target brand mentioned explicitly?") share_of_voice_rank: Optional[int] = Field(None, description="Rank in the list if applicable, else None") competitors: List[CompetitorMention] reasoning: str = Field(..., description="Why was the brand ranked this way?")
# The Evaluation Engine class SemanticJudge: def __init__(self, target_brand: str): self.client = instructor.patch(OpenAI()) self.target_brand = target_brand
def evaluate_response(self, raw_llm_response: str, user_query: str) -> VisibilityAudit: """ Uses GPT-4o to analyze the output of the target LLM (e.g., Perplexity). """ system_prompt = f""" You are an SEO Auditor. Analyze the following text generated by an AI Search Engine in response to the query: "{user_query}".
Determine if the brand "{self.target_brand}" is present. Extract sentiment and ranking relative to competitors. """
return self.client.chat.completions.create( model="gpt-4o", response_model=VisibilityAudit, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": raw_llm_response} ] )
# Usage judge = SemanticJudge(target_brand="AcmeDB") audit = judge.evaluate_response( raw_llm_response="For vector databases, Pinecone and Weaviate are top choices. AcmeDB is also viable for smaller workloads...", user_query="Best vector databases for enterprise" )
print(f"Mentioned: {audit.contains_target_brand}") print(f"Sentiment: {audit.competitors[0].sentiment_score}") # Accessing structured data
Implementation Phase 3: The Metrics Layer With the data structured, we move to storage and analysis. We chose ClickHouse for this workload because it handles high-cardinality aggregations exceptionally well, and we are effectively logging event streams.
The core metric we track is Weighted Share of Citation (WSoC). Unlike simple "Share of Voice" (which is binary), WSoC accounts for rank and sentiment.
Formula: $$ WSoC = \frac{\sum (Sentiment \times \frac{1}{Rank})}{\text{Total Queries}} $$
(Note: If no rank is detected, we assume a rank of 1 for solo answers or use a default weight).
Here is the SQL schema and aggregation query to track this over time:
- - 1. The Event Log Table
CREATE TABLE ai_visibility_logs ( timestamp DateTime DEFAULT now(), query_id String, target_model LowCardinality(String), -- e.g., 'perplexity', 'chatgpt' query_text String, brand_mentioned UInt8, sentiment_score Float32, -- -1.0 to 1.0 detected_rank UInt8, -- 0 if not ranked competitors Array(String) ) ENGINE = MergeTree() ORDER BY (target_model, timestamp);
- - 2. Calculating Daily "Share of Model"
SELECT toDate(timestamp) as date, target_model,
- - Basic Visibility %
countIf(brand_mentioned = 1) / count() as visibility_rate,
- - Sentiment-Adjusted Visibility
avgIf(sentiment_score, brand_mentioned = 1) as avg_sentiment,
- - Top of Mind Awareness (How often are we #1?)
countIf(detected_rank = 1) / count() as top_rank_rate FROM ai_visibility_logs WHERE query_text IN ('best vector db', 'enterprise search api') GROUP BY date, target_model ORDER BY date DESC;
Handling Non-Determinism and Drift During our rollout, we encountered significant "Model Drift." A prompt that returned a structured list on Tuesday returned a conversational paragraph on Wednesday. This broke our initial Regex-based judges, validating the move to LLM-based judges.
However, LLM Judges are not free. Running GPT-4o to evaluate thousands of Perplexity responses gets expensive.
Optimization: The Tiered Judge System We implemented a tiered evaluation strategy to manage costs: 1. Tier 1 (Fast/Cheap): Simple string matching. If the brand name isn't in the text at all, mark visibility=0 and skip the LLM Judge. 2. Tier 2 (The Judge): Only if the brand (or a competitor) is detected, invoke the SemanticJudge to extract sentiment and rank. 3. Tier 3 (Human Review): We sample 5% of evaluations where sentiment_score is below 0.0 (negative) to verify if the model is hallucinating complaints about our product.
Managing Rate Limits & Browser Integrity While APIs like Perplexity's sonar are easy to query, measuring visibility on "SearchGPT" or Google's SGE often requires headless browsing (Puppeteer/Playwright) because official APIs for the search interface often lag behind the consumer UI.
When scraping interfaces:
- Context Contamination: Ensure every probe starts with a fresh session/context. AI models carry context. If you ask about "Shoes" then "Socks", the "Socks" answer is biased.
- Geo-Location: AI Search results are heavily geo-biased. Use residential proxies to rotate IPs for every batch of probes to simulate a distributed user base.
Summary Building a pipeline for AI Search Visibility requires accepting ambiguity. You are no longer measuring a static index; you are measuring the probability distribution of a neural network's weights.
By architecting a system that probes asynchronously, evaluates semantically, and aggregates probabilistically, you move from "guessing" to actionable engineering metrics. You can definitively tell your Product VP: "Our engineering efforts on documentation increased our citation probability on Perplexity by 14% this quarter."
That is a metric you can optimize.