Complete reference for the Smartflow Python SDK — async client, chat, completions, embeddings, VAS audit logs, routing, compliance, MCP tools, and A2A agents.
Version 0.4.0 ·
pip install smartflow-sdk · Python 3.9+
Smartflow is an enterprise AI orchestration layer that sits between your application and AI providers (OpenAI, Anthropic, Google, and others). It provides:
┌─────────────────────────────────────────────────────────────────┐
│ YOUR APPLICATION │
│ │ │
│ pip install smartflow-sdk │
│ │ │
│ ┌────────▼────────┐ │
│ │ Smartflow SDK │ │
│ └────────┬────────┘ │
└─────────────────────────────┼───────────────────────────────────┘
│
┌──────────▼──────────┐
│ SMARTFLOW PROXY │
│ ┌────────────────┐ │
│ │ MetaCache │ │ ← 60-80% cost savings
│ │ ML Compliance │ │ ← Adaptive PII detection
│ │ Smart Routing │ │ ← Best provider selection
│ │ VAS Logging │ │ ← Complete audit trail
│ │ MCP Gateway │ │ ← Tool orchestration
│ │ A2A Gateway │ │ ← Agent-to-agent tasks
│ └────────────────┘ │
└──────────┬──────────┘
│
┌───────────┬───────┴───────┬───────────┬───────────┐
▼ ▼ ▼ ▼ ▼
┌───────┐ ┌─────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│OpenAI │ │Anthropic│ │ Gemini │ │ Cohere │ │ Local │
└───────┘ └─────────┘ └────────┘ └────────┘ └────────┘
Smartflow SDK v0.4.0 introduces a dual-mode architecture that gives developers the freedom to start building immediately — even without a deployed Smartflow instance — and seamlessly upgrade to full enterprise gateway mode whenever they are ready.
Connect to a deployed Smartflow instance for the full feature set.
Call AI providers directly — same API surface, no infrastructure required.
DirectModeErrorMode is selected automatically in priority order — no manual configuration required:
# Priority 1: Explicit URL argument (backward compatible — always gateway mode)
sf = SmartflowClient("https://yourco.langsmart.app", api_key="sk-sf-...")
# Priority 2: SMARTFLOW_GATEWAY_URL environment variable
# export SMARTFLOW_GATEWAY_URL="https://yourco.langsmart.app"
sf = SmartflowClient() # detects env var → gateway mode
# Priority 3: ~/.smartflow/config.yaml (written by `smartflow configure`)
sf = SmartflowClient() # reads config file → gateway or direct mode
# Priority 4: No gateway configured → direct mode (calls providers directly)
sf = SmartflowClient() # direct mode with env var keys (OPENAI_API_KEY, etc.)
# Check which mode you're in
print(sf.mode) # "gateway" or "direct"
print(sf.is_gateway_mode()) # True / False
Run the interactive wizard once to configure your environment. It saves settings to
~/.smartflow/config.yaml and is read automatically on every subsequent
SmartflowClient() instantiation.
# CLI wizard (recommended — asks gateway URL or provider keys)
smartflow configure
# Check current config and test connectivity
smartflow status
# Quick chat test
smartflow chat "Hello, which mode am I using?"
Or trigger the wizard from Python:
import smartflow
smartflow.configure() # same interactive wizard
In direct mode, the model string controls which provider is called. The same prefix notation works in gateway mode too (the gateway translates it).
sf = SmartflowClient() # or SmartflowClient("https://gateway...")
# OpenAI (default)
await sf.chat("Hello", model="gpt-4o")
await sf.chat("Hello", model="openai/gpt-4o") # explicit prefix
# Anthropic Claude
await sf.chat("Hello", model="claude-sonnet-4-6")
await sf.chat("Hello", model="anthropic/claude-3-5-haiku-20241022")
# Google Gemini (via openai-compat endpoint)
await sf.chat("Hello", model="gemini-1.5-pro")
await sf.chat("Hello", model="gemini/gemini-2.0-flash")
# Ollama (local — OLLAMA_BASE_URL env var or default http://localhost:11434)
await sf.chat("Hello", model="ollama/llama3")
await sf.chat("Hello", model="ollama/mistral")
# Custom OpenAI-compat server (LOCAL_BASE_URL env var)
await sf.chat("Hello", model="local/my-fine-tuned-model")
anthropic/claude-*, gemini/...,
ollama/...) also works in gateway mode — the Smartflow proxy translates
and routes automatically. The same code works in both modes.
Calling a gateway-only method (compliance scan, VAS logs, routing override, etc.) in
direct mode raises a clear DirectModeError with instructions:
from smartflow import SmartflowClient
from smartflow.direct import DirectModeError
sf = SmartflowClient() # direct mode
try:
logs = await sf.get_logs()
except DirectModeError as e:
print(e)
# VAS audit logs requires a Smartflow Enterprise gateway.
# Run `smartflow configure` to connect to a gateway and unlock:
# • BERT KNN semantic cache (55–75% cost savings)
# • Real-time policy engine (PII detection, jailbreak guard)
# • SSO identity integration (Entra ID, LDAP, SAML)
# • Full VAS audit trail (per-user request logging)
# • MCP gateway + A2A orchestration
pip install smartflow-sdkFrom source:
pip install git+https://github.com/SRAGroupTX/SmartflowV3.git#subdirectory=sdk/pythonOptional — sync client in async environments (Jupyter notebooks):
pip install nest_asyncioimport asyncio
from smartflow import SmartflowClient
async def main():
async with SmartflowClient("https://yourco.langsmart.app", api_key="sk-sf-...") as sf: # gateway mode
# Automatic caching, compliance scanning, multi-provider failover,
# and complete audit logging on every call.
response = await sf.chat("Explain quantum computing in simple terms")
print(response)
asyncio.run(main())from smartflow import SyncSmartflowClient
sf = SyncSmartflowClient() # reads ~/.smartflow/config.yaml (gateway or direct)
response = sf.chat("What is machine learning?")
print(response)
stats = sf.get_cache_stats()
print(f"Cache hit rate: {stats.hit_rate:.1%}")
print(f"Tokens saved: {stats.tokens_saved:,}")
sf.close()To use direct mode (no gateway), install the provider packages you need:
# OpenAI + Anthropic (most common)
pip install "smartflow-sdk[all]"
# Just OpenAI
pip install "smartflow-sdk[openai]"
# Just Anthropic
pip install "smartflow-sdk[anthropic]"
# Sync client in Jupyter notebooks
pip install "smartflow-sdk[nest]"
In gateway mode, no provider packages are required — the gateway handles provider calls server-side. httpx is the only dependency.
For Jupyter notebooks with an existing event loop:
import nest_asyncio
nest_asyncio.apply()Zero code changes required — just update the base URL:
from openai import OpenAI
# Before: client = OpenAI()
# After: through Smartflow — caching, compliance, logging all apply transparently
client = OpenAI(
base_url="http://your-smartflow:7775/v1",
api_key="sk-sf-your-virtual-key"
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)| Feature | Gateway Mode | Direct Mode |
|---|---|---|
chat() / chat_completions() | ✓ Full | ✓ Full |
stream_chat() | ✓ Full | ✓ Full |
embeddings() | ✓ Full | ✓ OpenAI only |
claude_message() | ✓ Via proxy | ✓ Direct Anthropic |
| Multi-provider routing | ✓ 37+ providers | ✓ OpenAI / Anthropic / Gemini / Ollama |
| Semantic BERT cache (55–75%) | ✓ 4-phase MetaCache | ✗ Not available |
check_compliance() / intelligent_scan() | ✓ ML-powered | ✗ Gateway only |
get_logs() (VAS audit trail) | ✓ Per-user SSO | ✗ Gateway only |
get_cache_stats() | ✓ Live MetaCache stats | ✓ Client-side counts |
| SSO / Enterprise Identity | ✓ Entra ID / LDAP / SAML | ✗ Gateway only |
| MCP Gateway / A2A Orchestration | ✓ Full | ✗ Gateway only |
| Prometheus metrics | ✓ Native /metrics | ✗ Gateway only |
| Policy engine / guardrails | ✓ Visual editor + no-code | ✗ Gateway only |
| Required dependencies | httpx only | openai / anthropic (optional) |
Primary async client.
class SmartflowClient(
base_url: Optional[str] = None, # Gateway URL or None (direct/config mode)
api_key: Optional[str] = None, # Virtual key (sk-sf-...)
timeout: float = 30.0, # Request timeout in seconds
management_port: int = 7778, # Management API port
compliance_port: int = 7777, # Compliance API port
bridge_port: int = 3500, # Hybrid bridge port
)Use as a context manager for automatic cleanup:
async with SmartflowClient("http://smartflow:7775", api_key="sk-sf-...") as sf:
...
# Or manual lifecycle
sf = SmartflowClient("http://smartflow:7775")
await sf._ensure_client()
# ... use sf ...
await sf.close()chat()Send a message, receive the reply as a plain string.
async def chat(
message: str,
model: str = "gpt-4o",
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs,
) -> strasync with SmartflowClient("http://smartflow:7775") as sf:
# Simple
response = await sf.chat("Explain Docker containers")
# With options
response = await sf.chat(
message="Write a Python function to sort a list",
model="gpt-4o",
system_prompt="You are an expert Python developer. Write clean, documented code.",
temperature=0.3,
max_tokens=1000,
)chat_completions()Full OpenAI-compatible completions. Returns a structured
AIResponse.
async def chat_completions(
messages: List[Dict[str, str]],
model: str = "gpt-4o",
temperature: float = 0.7,
max_tokens: Optional[int] = None,
stream: bool = False,
**kwargs,
) -> AIResponseresponse = await sf.chat_completions(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is REST API?"},
],
model="gpt-4o",
)
print(response.content)
print(f"Tokens used: {response.usage.total_tokens}")
print(f"Cached: {response.cached}") # True if served from MetaCachestream_chat()Async generator that yields text delta strings as they stream.
async def stream_chat(
message: str,
model: str = "gpt-4o",
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs,
) -> AsyncIterator[str]async for chunk in sf.stream_chat("Tell me a story about a robot"):
print(chunk, end="", flush=True)
print()claude_message()Send a message to Claude using the Anthropic Messages API native
path. The proxy injects the API key automatically — no
anthropic_key required in production.
async def claude_message(
message: str,
model: str = "claude-sonnet-4-6",
max_tokens: int = 1024,
system: Optional[str] = None,
anthropic_key: Optional[str] = None,
) -> strresponse = await sf.claude_message(
message="Analyze this code for security vulnerabilities",
model="claude-sonnet-4-6",
max_tokens=2000,
system="You are a senior security engineer.",
)Routes to /anthropic/v1/messages (native Anthropic
format). For multi-turn or multimodal use, call
chat_completions() with
model="claude-sonnet-4-6" using the OpenAI-compatible
format.
embeddings()Generate vector embeddings.
async def embeddings(
input: Union[str, List[str]],
model: str = "text-embedding-3-small",
encoding_format: str = "float",
dimensions: Optional[int] = None,
input_type: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]# Single text
result = await sf.embeddings("Hello, world!")
vector = result["data"][0]["embedding"]
# Batch
result = await sf.embeddings([
"First document",
"Second document",
"Third document",
])
vectors = [item["embedding"] for item in result["data"]]
# Cohere with input_type
result = await sf.embeddings(
["search query", "document text"],
model="cohere/embed-english-v3.0",
input_type="search_document",
)
# Reduce dimensions (OpenAI text-embedding-3+)
result = await sf.embeddings("Hello", model="text-embedding-3-large", dimensions=256)image_generation()Generate images.
async def image_generation(
prompt: str,
model: str = "dall-e-3",
n: int = 1,
size: str = "1024x1024",
quality: Optional[str] = None,
response_format: str = "url",
style: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]result = await sf.image_generation(
"A futuristic city at sunrise",
model="dall-e-3",
size="1792x1024",
quality="hd",
style="vivid",
)
print(result["data"][0]["url"])audio_transcription()Transcribe audio. Accepts a file-like object.
async def audio_transcription(
file: Any,
model: str = "whisper-1",
language: Optional[str] = None,
prompt: Optional[str] = None,
response_format: str = "json",
temperature: float = 0.0,
filename: str = "audio.mp3",
**kwargs,
) -> Dict[str, Any]with open("recording.mp3", "rb") as f:
result = await sf.audio_transcription(f, model="whisper-1")
print(result["text"])
# Groq (faster, free tier available)
with open("recording.mp3", "rb") as f:
result = await sf.audio_transcription(f, model="groq/whisper-large-v3")text_to_speech()Synthesize speech. Returns raw audio bytes.
async def text_to_speech(
input: str,
model: str = "tts-1",
voice: str = "alloy",
response_format: str = "mp3",
speed: float = 1.0,
**kwargs,
) -> bytesaudio = await sf.text_to_speech("Hello, this is Smartflow.", voice="nova")
with open("output.mp3", "wb") as f:
f.write(audio)rerank()Rerank documents by relevance to a query.
async def rerank(
query: str,
documents: List[str],
model: str = "rerank-english-v3.0",
top_n: Optional[int] = None,
**kwargs,
) -> Dict[str, Any]result = await sf.rerank(
"What is the return policy?",
["We accept returns within 30 days.", "Contact support@example.com."],
top_n=1,
)list_models()List available models across all enabled providers.
async def list_models() -> List[Dict[str, Any]]models = await sf.list_models()
for m in models:
print(m["id"])chatbot_query()Query Smartflow’s built-in system chatbot for operational information. Answers natural-language questions about VAS logs, cache stats, cost analysis, and system health.
async def chatbot_query(query: str) -> Dict[str, Any]result = await sf.chatbot_query("show me today's cache stats")
print(result["response"])
result = await sf.chatbot_query("which provider had the most errors this week?")
result = await sf.chatbot_query("what did we spend on OpenAI yesterday?")All methods that accept a model parameter support
provider prefix routing. Prefix the model name with
provider/ to route to a specific provider. For the primary
providers, no prefix is needed — model name is detected
automatically.
Automatic detection (no prefix needed):
# OpenAI — detected from gpt-*, o1-*, o3-*, chatgpt-*, whisper-*, dall-e-*
reply = await sf.chat("Hello", model="gpt-4o")
reply = await sf.chat("Hello", model="gpt-4o-mini")
reply = await sf.chat("Hello", model="o3-mini")
# Anthropic — detected from claude-*
reply = await sf.chat("Hello", model="claude-sonnet-4-6")
reply = await sf.chat("Hello", model="claude-3-opus-20240229")
# Google Gemini — detected from gemini-*
reply = await sf.chat("Hello", model="gemini-1.5-pro")
reply = await sf.chat("Hello", model="gemini-2.0-flash")Explicit prefix required:
reply = await sf.chat("Hello", model="xai/grok-2-latest")
reply = await sf.chat("Hello", model="mistral/mistral-large-latest")
reply = await sf.chat("Hello", model="cohere/command-r-plus")
reply = await sf.chat("Hello", model="groq/llama-3.1-70b-versatile")
reply = await sf.chat("Hello", model="openrouter/meta-llama/llama-3.1-405b")
reply = await sf.chat("Hello", model="ollama/llama3.2")
reply = await sf.chat("Hello", model="azure/my-gpt4o-deployment")
# Force native Anthropic Messages API path
reply = await sf.claude_message("Hello", model="claude-sonnet-4-6")Full prefix table:
| Prefix | Provider | API Key Env Var |
|---|---|---|
| (none) | OpenAI | OPENAI_API_KEY |
anthropic/ |
Anthropic | ANTHROPIC_API_KEY |
xai/ |
xAI (Grok) | XAI_API_KEY |
gemini/ |
Google Gemini | GEMINI_API_KEY |
vertex_ai/ |
Google Vertex AI | VERTEXAI_API_KEY |
openrouter/ |
OpenRouter | OPENROUTER_API_KEY |
azure/ |
Azure OpenAI | AZURE_API_KEY + AZURE_API_BASE |
mistral/ |
Mistral AI | MISTRAL_API_KEY |
cohere/ |
Cohere | COHERE_API_KEY |
nvidia_nim/ |
NVIDIA NIM | NVIDIA_NIM_API_KEY |
huggingface/ |
HuggingFace | HUGGINGFACE_API_KEY |
groq/ |
Groq | GROQ_API_KEY |
deepgram/ |
Deepgram | DEEPGRAM_API_KEY |
fireworks/ |
Fireworks AI | FIREWORKS_API_KEY |
novita/ |
Novita AI | NOVITA_API_KEY |
together/ |
Together AI | TOGETHER_API_KEY |
perplexity/ |
Perplexity AI | PERPLEXITY_API_KEY |
replicate/ |
Replicate | REPLICATE_API_KEY |
vercel_ai_gateway/ |
Vercel AI Gateway | VERCEL_AI_GATEWAY_API_KEY |
ollama/ |
Ollama (local) | (none required) |
Smartflow’s ML-powered compliance engine goes beyond regex. It learns and adapts based on user behavior and organizational baselines.
┌─────────────────────────────────────────────────────────────────┐
│ INTELLIGENT COMPLIANCE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ LAYER 1 │ │ LAYER 2 │ │ LAYER 3 │ │
│ │ Regex/Rules │ → │ ML Embeddings│ → │ Behavioral │ │
│ │ │ │ │ │ Analysis │ │
│ │ SSN │ │ Semantic │ │ User │ │
│ │ Credit Card │ │ similarity │ │ patterns │ │
│ │ Email │ │ Context │ │ Org │ │
│ │ Phone │ │ awareness │ │ baselines │ │
│ │ MRN │ │ Learned │ │ Anomaly │ │
│ │ Passport │ │ patterns │ │ detection │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └─────────────────┼──────────────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ CORRELATION │ │
│ │ ENGINE │ │
│ │ │ │
│ │ Composite Risk │ │
│ │ Score + Action │ │
│ └──────────────────┘ │
│ │ │
│ ▼ │
│ Allow | AllowAndLog | Review | Block │
└─────────────────────────────────────────────────────────────────┘
intelligent_scan()async def intelligent_scan(
content: str,
user_id: Optional[str] = None,
org_id: Optional[str] = None,
context: Optional[str] = None,
) -> IntelligentScanResultresult = await sf.intelligent_scan(
content="Please send payment to card 4111-1111-1111-1111",
)
print(f"Has violations: {result.has_violations}") # True
print(f"Risk score: {result.risk_score:.2f}") # 0.0 – 1.0
print(f"Action: {result.recommended_action}") # Allow/AllowAndLog/Block/Review
print(f"Explanation: {result.explanation}")
for v in result.regex_violations:
print(f" - {v['violation_type']}: {v['severity']}")Enable behavioral analysis with user context:
result = await sf.intelligent_scan(
content="Customer email: john.doe@example.com",
user_id="support_agent_42", # Track individual behavior
org_id="acme_corporation", # Compare against org baseline
context="customer_support", # Context for better detection
)check_compliance()Rule-based compliance scan.
async def check_compliance(
content: str,
policy: str = "enterprise_standard",
) -> ComplianceResultresult = await sf.check_compliance("My SSN is 123-45-6789")
if result.has_violations:
print(f"Risk: {result.risk_level}")
print(f"PII: {result.pii_detected}")
print(f"Safe text: {result.redacted_content}")redact_pii()async def redact_pii(content: str) -> strsafe = await sf.redact_pii("Call me at 555-867-5309, email john@example.com")
# "Call me at [PHONE], email [EMAIL]"submit_compliance_feedback()Submit a true/false-positive correction to retrain the ML model.
async def submit_compliance_feedback(
scan_id: str,
is_false_positive: bool,
user_id: Optional[str] = None,
notes: Optional[str] = None,
) -> Dict[str, Any]# Scan content, store the scan response dict to get the scan_id
response = await sf._post(
f"{sf.compliance_url}/api/compliance/intelligent/scan",
{"content": "Call me at 555-0100"}
)
scan_id = response.get("scan_id")
if scan_id:
await sf.submit_compliance_feedback(
scan_id=scan_id,
is_false_positive=True,
user_id="admin_user",
notes="555-0100 is a known test number, not real PII",
)get_learning_summary()Organization-wide learning progress.
async def get_learning_summary() -> LearningSummarysummary = await sf.get_learning_summary()
print(f"Total users tracked: {summary.total_users}")
print(f"Users with complete baselines: {summary.users_learning_complete}")
print(f"Learning period: {summary.config_learning_days} days")get_learning_status()Adaptive learning status for a specific user.
async def get_learning_status(user_id: str) -> LearningStatusstatus = await sf.get_learning_status("user-alice")
print(f"Days tracked: {status.days_tracked}")
print(f"Progress: {status.progress_percent}%")
print(f"Complete: {status.learning_complete}")get_ml_stats()ML compliance engine statistics.
async def get_ml_stats() -> MLStatsml_stats = await sf.get_ml_stats()
print(f"Total patterns: {ml_stats.total_patterns}")
print(f"Learned patterns: {ml_stats.learned_patterns}")
print(f"Pattern categories: {ml_stats.patterns_by_category}")
print(f"Average confidence: {ml_stats.average_confidence:.2f}")get_org_baseline()Organization behavioral baseline used for anomaly detection.
async def get_org_baseline(org_id: str) -> OrgBaselinebaseline = await sf.get_org_baseline("acme-corp")
print(f"Users: {baseline.user_count}")
print(f"Violation rate: {baseline.violation_rate:.2%}")
print(f"Top violations: {baseline.top_violation_types}")| Method | Returns | Description |
|---|---|---|
get_org_summary() |
Dict |
Organization-level aggregate compliance stats |
get_persistence_stats() |
PersistenceStats |
Redis persistence stats for compliance data |
save_compliance_data() |
Dict |
Trigger manual flush of compliance data to Redis |
get_intelligent_health() |
Dict |
Health status of ML engine and embedding service |
The SDK does not expose dedicated MCP wrapper methods. MCP tool calls
are direct HTTP requests to the proxy. Use httpx or any
HTTP client.
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://smartflow:7775/github-tools/mcp/",
headers={
"Authorization": "Bearer sk-sf-...",
"Content-Type": "application/json",
},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "create_issue",
"arguments": {
"repo": "my-org/my-repo",
"title": "Bug: login fails on mobile",
"body": "Steps to reproduce...",
}
}
}
)
result = response.json()
print(result["result"]["content"])async with httpx.AsyncClient() as client:
r = await client.get(
"http://smartflow:7775/api/mcp/tools/search",
params={"q": "create github issue", "k": 3},
headers={"Authorization": "Bearer sk-sf-..."},
)
for tool in r.json()["results"]:
print(f"{tool['server_id']}.{tool['name']}: {tool['description']}")async with httpx.AsyncClient() as client:
r = await client.get(
"http://smartflow:7775/api/mcp/usage",
headers={"Authorization": "Bearer sk-sf-..."},
)
# Per-server call counts and cumulative cost totalsA2A (Agent-to-Agent) tasks are HTTP POST requests to the proxy. The proxy forwards to the registered agent, logs the exchange, and returns the result.
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://smartflow:7775/a2a/summarizer-agent",
headers={
"Authorization": "Bearer sk-sf-...",
"Content-Type": "application/json",
"x-a2a-trace-id": "trace-abc-123", # correlate across agents
},
json={
"id": "task-uuid-001",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Summarise the Q4 earnings report."}]
}
}
)
result = response.json()
print(result["result"]["parts"][0]["text"])async with httpx.AsyncClient() as client:
r = await client.get(
"http://smartflow:7775/a2a/summarizer-agent/.well-known/agent.json",
headers={"Authorization": "Bearer sk-sf-..."},
)
card = r.json()
print(card["name"], card["capabilities"])The x-a2a-trace-id header is passed through all hops in
a multi-agent chain so logs from every agent can be correlated by a
single trace ID.
stats = await sf.get_cache_stats()
print("=== CACHE PERFORMANCE ===")
print(f"Hit Rate: {stats.hit_rate:.1%}")
print(f"")
print(f"Layer Breakdown:")
print(f" L1 (Memory): {stats.l1_hits:,} hits")
print(f" L2 (Semantic): {stats.l2_hits:,} hits")
print(f" L3 (Exact): {stats.l3_hits:,} hits")
print(f"")
print(f"Savings:")
print(f" Tokens saved: {stats.tokens_saved:,}")
print(f" Cost saved: ${stats.cost_saved_cents / 100:.2f}")CacheStats fields: hits,
misses, hit_rate, tokens_saved,
cost_saved_cents, l1_hits,
l2_hits, l3_hits, entries
providers = await sf.get_provider_health()
print("=== PROVIDER STATUS ===")
for p in providers:
print(f"{p.provider}")
print(f" Status: {p.status}")
print(f" Latency: {p.latency_ms:.0f}ms")
print(f" Success: {p.success_rate:.1%}")
print(f" Requests: {p.requests_total:,}")ProviderHealth fields: provider,
status, latency_ms, success_rate,
error_rate, requests_total,
last_updated
health = await sf.health_comprehensive()
print(health.status) # "healthy" | "degraded" | "unhealthy"
print(health.uptime_seconds)
print(health.version)Quick liveness check:
status = await sf.health()
assert status["status"] == "ok"async def get_logs(
limit: int = 100,
offset: int = 0,
provider: Optional[str] = None,
model: Optional[str] = None,
days: int = 30,
) -> List[VASLog]Retrieve VAS audit logs from the hot Redis tier (recent) and cold MongoDB tier (archived). Logs are returned newest-first.
logs = await sf.get_logs(limit=50, provider="openai", days=7)
print("=== RECENT AI INTERACTIONS ===")
for log in logs:
route = log.routing_strategy or "direct"
cached_badge = f"[CACHE:{log.metacache.tokens_saved} tokens saved]" if log.metacache.hit else ""
print(f"[{log.timestamp}] {log.provider}/{log.model} {cached_badge}")
print(f" Latency: {log.latency_ms}ms | Tokens: {log.tokens_used} | Routing: {route}")
print(f" Compliance: {log.compliance_status} | Conv: {log.conversation_id}")
if log.compliance_violations:
print(f" Violations: {log.compliance_violations}")Pagination example:
page_size = 100
page = 0
while True:
batch = await sf.get_logs(limit=page_size, offset=page * page_size)
if not batch:
break
process(batch)
page += 1async def get_analytics(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> Dict[str, Any]data = await sf.get_analytics(
start_date="2026-02-01",
end_date="2026-02-19",
)get_routing_status()Current routing state: active provider, fallback chain, last failure.
status = await sf.get_routing_status()force_provider()Force all routing to a specific provider for a duration.
async def force_provider(
provider: str,
duration_seconds: int = 300,
) -> Dict[str, Any]# Force to OpenAI for 10 minutes during an Anthropic outage
await sf.force_provider("openai", duration_seconds=600)Higher-level agent with conversation memory, compliance scanning, and tool support.
from smartflow import SmartflowClient, SmartflowAgent
async with SmartflowClient("http://smartflow:7775") as sf:
agent = SmartflowAgent(
client=sf,
name="TechSupport",
model="gpt-4o",
system_prompt="""You are a senior technical support engineer.
Guidelines:
- Be patient and thorough
- Ask clarifying questions when needed
- Provide step-by-step solutions
- Never ask for or repeat sensitive information""",
temperature=0.7,
compliance_policy="enterprise_standard",
enable_compliance_scan=True, # Auto-scan inputs and outputs
user_id="support_session_123",
org_id="tech_company",
)
# Conversation with full context memory
print(await agent.chat("My application keeps crashing"))
print(await agent.chat("It's a Python web app using Flask"))
print(await agent.chat("Here's the error: MemoryError"))
print(f"Messages exchanged: {agent.message_count}")
agent.clear_history()| Method | Description |
|---|---|
chat(message, scan_input=True, scan_output=True) |
Send message; raises ComplianceError if blocked |
clear_history() |
Reset conversation, preserve system prompt |
get_history() |
Return copy of message history |
message_count |
Number of messages in history |
Chain AI operations with branching logic.
from smartflow import SmartflowClient, SmartflowWorkflow
async with SmartflowClient("http://smartflow:7775") as sf:
workflow = SmartflowWorkflow(sf, name="ContentPipeline")
workflow.add_step(
name="analyze",
action="chat",
config={
"prompt": "Analyze the tone and intent of this text: {input}",
"model": "gpt-4o-mini",
},
next_steps=["compliance_check"],
)
workflow.add_step(
name="compliance_check",
action="compliance_check",
config={"content": "{input}"},
next_steps=["route"],
)
workflow.add_step(
name="route",
action="condition",
config={
"field": "output",
"cases": {
"positive": "enhance",
"negative": "review",
"neutral": "publish",
},
},
)
result = await workflow.execute({"input": "This product exceeded my expectations!"})
print(f"Success: {result.success}")
print(f"Path taken: {' -> '.join(result.steps_executed)}")
print(f"Execution time: {result.execution_time_ms:.0f}ms")
print(f"Total tokens: {result.total_tokens}")Step actions:
| Action | Config fields | Description |
|---|---|---|
"chat" |
prompt, model,
temperature |
Chat completion; {input} / {output} are
template variables |
"compliance_check" |
content |
Rule-based compliance scan |
"condition" |
field, cases, default |
Branch on a context value |
Synchronous wrapper. Every async method is available without
await.
from smartflow import SyncSmartflowClient
sf = SyncSmartflowClient("http://smartflow:7775", api_key="sk-sf-...")
reply = sf.chat("Hello!")
emb = sf.embeddings("Hello", model="text-embedding-3-small")
img = sf.image_generation("A sunset", model="dall-e-3")
transcript = sf.audio_transcription(open("audio.mp3", "rb"), model="whisper-1")
audio = sf.text_to_speech("Hello!", voice="nova")
ranked = sf.rerank("What is the return policy?", ["doc1", "doc2"])
stats = sf.get_cache_stats()
logs = sf.get_logs(limit=20)
sf.close()sf = SmartflowClient(
base_url="http://smartflow:7775", # Proxy endpoint
api_key="sk-sf-...", # Virtual key for authentication
timeout=30.0, # Request timeout in seconds
management_port=7778, # Health, metrics, routing API
compliance_port=7777, # Compliance API
bridge_port=3500, # Hybrid bridge (cross-instance logs)
)import os
from smartflow import SmartflowClient
sf = SmartflowClient(
base_url=os.environ["SMARTFLOW_URL"],
api_key=os.environ.get("SMARTFLOW_API_KEY"),
)from smartflow import (
SmartflowClient,
SmartflowError,
ConnectionError,
ComplianceError,
RateLimitError,
TimeoutError,
)
import asyncio
try:
async with SmartflowClient("http://smartflow:7775") as sf:
response = await sf.chat("Hello!")
except ConnectionError:
print("Cannot connect to Smartflow proxy")
except ComplianceError as e:
print(f"Blocked by compliance policy: {e}")
except RateLimitError:
print("Rate limited — backing off")
await asyncio.sleep(60)
except TimeoutError:
print("Request timed out")
except SmartflowError as e:
print(f"Smartflow error: {e}")| Exception | Condition |
|---|---|
SmartflowError |
Base class for all SDK errors |
ConnectionError |
Cannot connect to proxy |
AuthenticationError |
401 — invalid or missing key |
RateLimitError |
429 — rate limit hit |
ComplianceError |
403 — request blocked by compliance policy |
ProviderError |
Upstream provider error |
TimeoutError |
Request timeout |
Challenge: Build a customer support chatbot that handles sensitive information while maintaining PCI-DSS and GDPR compliance.
import asyncio
from smartflow import SmartflowClient, SmartflowAgent
class SecureCustomerSupportBot:
"""
Customer support bot with built-in PII protection.
- Automatic PII detection and blocking
- Conversation memory
- Audit trail for compliance
- Behavioral analysis per customer
"""
def __init__(self, smartflow_url: str):
self.sf_url = smartflow_url
async def handle_customer_session(self, customer_id: str, organization: str):
async with SmartflowClient(self.sf_url) as sf:
agent = SmartflowAgent(
client=sf,
name="SecureSupport",
model="gpt-4o",
system_prompt="""You are a helpful customer support agent for a financial services company.
CRITICAL RULES:
1. NEVER ask customers for full credit card numbers, SSNs, or passwords
2. If a customer shares sensitive info, acknowledge receipt but do not repeat it
3. For account verification, use last 4 digits only
4. Always offer secure channels for sensitive transactions""",
compliance_policy="pci_dss_strict",
enable_compliance_scan=True,
user_id=f"customer_{customer_id}",
org_id=organization,
)
print("SecureSupport: Hello! How can I help you today?")
while True:
user_input = input("Customer: ")
if user_input.lower() == "quit":
break
try:
response = await agent.chat(user_input)
print(f"SecureSupport: {response}")
except Exception as e:
if "compliance" in str(e).lower():
print("SecureSupport: I noticed you shared some sensitive information.")
print("For your protection, please use our secure verification process.")
else:
print(f"Error: {e}")
async def main():
bot = SecureCustomerSupportBot("http://smartflow:7775")
await bot.handle_customer_session(customer_id="12345", organization="fintech_corp")
# asyncio.run(main())What This Demonstrates: - PII detection blocks sensitive data before it reaches the AI provider - Behavioral tracking learns normal patterns per customer - Complete audit trail for compliance audits - Graceful handling of compliance violations
Challenge: Generate thousands of product descriptions daily while minimizing API costs.
import asyncio
from dataclasses import dataclass
from typing import List
from smartflow import SmartflowClient
@dataclass
class Product:
id: str
name: str
category: str
features: List[str]
price: float
class ContentGenerationPipeline:
"""
High-volume content generation with intelligent caching.
Cost optimization:
1. Similar products hit the semantic cache (60-80% savings)
2. Smaller models for simple tasks
3. Structured prompts to maximize cache hit potential
"""
def __init__(self, smartflow_url: str):
self.sf_url = smartflow_url
async def generate_description(self, sf: SmartflowClient, product: Product) -> dict:
# Structure prompt to maximize cache hits across similar products
prompt = f"""Write a compelling product description.
Category: {product.category}
Product: {product.name}
Key Features: {', '.join(product.features)}
Price Point: ${product.price:.2f}
Requirements:
- 2-3 sentences
- Highlight key benefits
- Include call-to-action
- Professional tone"""
response = await sf.chat_completions(
messages=[{"role": "user", "content": prompt}],
model="gpt-4o-mini",
temperature=0.7,
)
return {
"product_id": product.id,
"description": response.content,
"cached": response.cached,
"tokens": response.usage.total_tokens,
}
async def process_catalog(self, products: List[Product]) -> dict:
async with SmartflowClient(self.sf_url) as sf:
initial_stats = await sf.get_cache_stats()
results = []
cached_count = 0
total_tokens = 0
for i, product in enumerate(products):
result = await self.generate_description(sf, product)
results.append(result)
if result["cached"]:
cached_count += 1
total_tokens += result["tokens"]
if (i + 1) % 10 == 0:
print(f"Processed {i + 1}/{len(products)} products...")
final_stats = await sf.get_cache_stats()
tokens_saved = final_stats.tokens_saved - initial_stats.tokens_saved
cost_saved = final_stats.cost_saved_cents - initial_stats.cost_saved_cents
cache_hit_rate = cached_count / len(products) if products else 0
return {
"results": results,
"summary": {
"total_products": len(products),
"cache_hit_rate": f"{cache_hit_rate:.1%}",
"tokens_used": total_tokens,
"tokens_saved": tokens_saved,
"cost_saved": f"${cost_saved / 100:.2f}",
},
}
async def main():
pipeline = ContentGenerationPipeline("http://smartflow:7775")
products = [
Product("SKU001", "Wireless Bluetooth Headphones", "Electronics",
["Noise cancelling", "40hr battery", "Premium sound"], 149.99),
Product("SKU002", "Wireless Earbuds Pro", "Electronics",
["Active noise cancelling", "36hr battery", "Hi-Fi audio"], 129.99),
Product("SKU003", "Over-Ear Gaming Headset", "Electronics",
["7.1 surround", "Noise isolation", "RGB lighting"], 89.99),
]
result = await pipeline.process_catalog(products)
print(f"Products processed: {result['summary']['total_products']}")
print(f"Cache hit rate: {result['summary']['cache_hit_rate']}")
print(f"Tokens saved: {result['summary']['tokens_saved']:,}")
print(f"Cost saved: {result['summary']['cost_saved']}")
# asyncio.run(main())What This Demonstrates: - Semantic caching
recognizes similar products and reuses responses - Structured prompts
maximize cache hit potential - Real-time cost tracking via
cost_saved_cents - Batch processing for high-volume
workloads
Challenge: Coordinate multiple specialized AI agents to produce a polished, auditable research report.
import asyncio
from datetime import datetime
from smartflow import SmartflowClient, SmartflowAgent
class ResearchOrchestrator:
"""
Multi-agent research system.
Agents:
1. Researcher — gathers and summarizes information
2. Analyst — identifies patterns and insights
3. Writer — produces polished executive report
4. Editor — reviews for accuracy and clarity (deep mode only)
All interactions logged for full auditability.
"""
def __init__(self, smartflow_url: str):
self.sf_url = smartflow_url
async def research_topic(self, topic: str, depth: str = "standard") -> dict:
async with SmartflowClient(self.sf_url) as sf:
timestamp = datetime.now().isoformat()
researcher = SmartflowAgent(
client=sf, name="Researcher", model="gpt-4o",
system_prompt="""You are a thorough research analyst.
Provide structured findings covering: current state, recent developments,
key players, and challenges. Output as organized bullet points.""",
user_id="research_system", org_id="analytics_dept",
)
print(f"Researcher: Investigating '{topic}'...")
research_data = await researcher.chat(
f"Research this topic: {topic}\n\n"
f"Cover: current state and facts, recent developments, "
f"key players, challenges and opportunities."
)
if depth == "quick":
return {"topic": topic, "timestamp": timestamp,
"report": research_data, "agents_used": ["Researcher"]}
analyst = SmartflowAgent(
client=sf, name="Analyst", model="gpt-4o",
system_prompt="""You are a strategic analyst.
Identify non-obvious patterns, provide data-driven insights,
make predictions, and highlight risks and opportunities.""",
user_id="research_system", org_id="analytics_dept",
)
print("Analyst: Analyzing findings...")
analysis = await analyst.chat(
f"Analyze this research and provide strategic insights:\n\n"
f"{research_data}\n\nFocus on hidden patterns, future implications, "
f"and strategic recommendations."
)
writer = SmartflowAgent(
client=sf, name="Writer", model="gpt-4o",
system_prompt="""You are an expert business writer.
Synthesize research and analysis into a coherent executive narrative.
Lead with key findings and recommendations.""",
user_id="research_system", org_id="analytics_dept",
)
print("Writer: Composing report...")
draft_report = await writer.chat(
f"Write an executive report from this research and analysis:\n\n"
f"RESEARCH:\n{research_data}\n\nANALYSIS:\n{analysis}\n\n"
f"Include: Executive Summary, Key Findings, "
f"Strategic Analysis, Recommendations, Conclusion."
)
if depth == "standard":
return {"topic": topic, "timestamp": timestamp,
"report": draft_report,
"agents_used": ["Researcher", "Analyst", "Writer"]}
# Deep mode: add Editor
editor = SmartflowAgent(
client=sf, name="Editor", model="gpt-4o",
system_prompt="""You are a senior business editor.
Review for accuracy and clarity, improve flow, ensure consistent tone,
fact-check against source research, polish for executive presentation.""",
temperature=0.3,
user_id="research_system", org_id="analytics_dept",
)
print("Editor: Polishing final report...")
final_report = await editor.chat(
f"Edit and polish this report:\n\n{draft_report}\n\n"
f"Source research for fact-checking:\n{research_data}"
)
logs = await sf.get_logs(limit=10)
return {
"topic": topic,
"timestamp": timestamp,
"report": final_report,
"agents_used": ["Researcher", "Analyst", "Writer", "Editor"],
"audit_trail": [
{"timestamp": log.timestamp, "model": log.model,
"tokens": log.tokens_used, "cached": log.cached}
for log in logs
],
}
async def main():
orchestrator = ResearchOrchestrator("http://smartflow:7775")
result = await orchestrator.research_topic(
topic="The impact of AI agents on enterprise software development in 2026",
depth="deep",
)
print("=" * 60)
print(f"Topic: {result['topic']}")
print(f"Generated: {result['timestamp']}")
print(f"Agents: {' -> '.join(result['agents_used'])}")
print("=" * 60)
print(result["report"])
print("\nAUDIT TRAIL:")
for entry in result.get("audit_trail", []):
cached = "[CACHED]" if entry["cached"] else ""
print(f" [{entry['timestamp']}] {entry['model']} — {entry['tokens']} tokens {cached}")
# asyncio.run(main())What This Demonstrates: - Coordinated multi-agent workflows with specialized roles - Progressive refinement through an agent chain - Complete per-request audit trail for every AI interaction - Organizational context tracking for behavioral analysis
| Field | Type | Description |
|---|---|---|
content |
str |
First choice text |
choices |
list |
Full choices array |
usage |
Usage |
Token usage (prompt_tokens,
completion_tokens, total_tokens) |
model |
str |
Model used |
id |
str |
Response ID |
cached |
bool |
True if served from MetaCache |
cache_hit_type |
str |
"exact", "semantic" (Phase 4 VectorLite
BERT KNN hit), or None |
provider |
str |
Provider that served the request |
| Field | Type |
|---|---|
hit_rate |
float |
hits / misses |
int |
l1_hits / l2_hits /
l3_hits |
int |
tokens_saved |
int |
cost_saved_cents |
int |
entries |
int |
| Field | Type |
|---|---|
has_violations |
bool |
compliance_score |
float |
violations |
list[str] |
pii_detected |
list[str] |
risk_level |
str — "low" / "medium" /
"high" / "critical" |
recommendations |
list[str] |
redacted_content |
str \| None |
| Field | Type |
|---|---|
has_violations |
bool |
risk_score |
float — 0.0 to 1.0 |
recommended_action |
str — "Allow" / "AllowAndLog"
/ "Review" / "Block" |
explanation |
str |
regex_violations |
list |
ml_violations |
list |
behavior_deviations |
list |
processing_time_us |
int |
| Field | Type | Description |
|---|---|---|
request_id |
str |
Unique ID — matches x-smartflow-request-id response
header |
timestamp |
str |
ISO-8601 UTC |
provider |
str |
openai, anthropic, google,
etc. |
model |
str |
Actual model returned by provider (or requested model for cache hits) |
model_provider |
str |
Provider name only — never contains API key material |
tokens_used |
int |
Total tokens (prompt + completion) |
cost |
float |
Estimated USD cost |
latency_ms |
int |
End-to-end proxy latency. 5 for cache hits. |
processing_time_ms |
int |
Total processing time |
content_type |
str |
chat, completion, image,
etc. |
user_id |
str \| None |
Extracted from JWT, x-smartflow-user-id, or
x-user-id header |
conversation_id |
str \| None |
Set when x-conversation-id or x-session-id
header is present |
conversation_stage |
str \| None |
Conversation lifecycle stage |
routing_strategy |
str \| None |
"direct", "cache", or configured strategy
("latency", "tag", etc.) |
routing_reason |
str \| None |
Human-readable reason, e.g. "provider:openai" or
"cache_hit:tier=L1" |
compliance_status |
str \| None |
"compliant" or "violated" |
compliance_violations |
str \| None |
JSON-encoded list of violation objects when
compliance_status == "violated" |
compliance |
ComplianceInfo |
Full compliance object including data_classification,
compliance_score, violations_details,
regulatory_frameworks |
metacache |
MetacacheData |
Cache hit info: hit (bool), query (str),
tokens_saved (int) |
metrics |
ProviderMetrics |
Provider-level metrics: prompt_tokens,
completion_tokens, processing_time_ms,
success |
| Field | Type |
|---|---|
provider |
str |
status |
str — "healthy" / "degraded"
/ "unhealthy" |
latency_ms |
float |
success_rate |
float |
error_rate |
float |
requests_total |
int |
last_updated |
str |
| Field | Type |
|---|---|
status |
str |
uptime_seconds |
int |
version |
str |
providers |
dict |
cache |
dict |
timestamp |
str |
| Field | Type |
|---|---|
success |
bool |
output |
str |
steps_executed |
list[str] |
errors |
list |
total_tokens |
int |
total_cost_cents |
int |
execution_time_ms |
float |
| Feature | Benefit |
|---|---|
| Semantic Cache (3-tier) | 60–80% cost reduction, no external vector DB |
| ML Compliance Engine | Real-time PII protection with adaptive learning |
| Smart Routing | Latency, cost, or priority-based provider selection |
| Full Audit Trail (VAS) | Complete compliance visibility across every request |
| MCP Tool Gateway | Register and invoke external tools with shared auth and budgeting |
| A2A Agent Orchestration | Route tasks across agents with full traceability |
| Agent Builder | Production-ready conversational AI with memory and compliance |
| Workflow Orchestration | Multi-step AI pipelines with branching and error handling |
chatbot_query() — natural-language operational
queriesget_logs_hybrid() — unified audit log across all
instances via hybrid bridgesubmit_compliance_feedback() —
true/false-positive corrections for ML model retrainingget_learning_status(),
get_learning_summary() — adaptive learning progressget_ml_stats() — ML engine pattern counts and
accuracyget_org_summary(),
get_org_baseline() — organizational compliance
baselinesget_persistence_stats(),
save_compliance_data(),
get_intelligent_health()IntelligentScanResult field names
(latency_ms not avg_latency_ms,
cost_saved_cents not cost_saved_usd)get_analytics() signature:
start_date/end_date parametersSystemHealth field names:
status, uptime_seconds,
providersimage_generation() — multi-provider image
generationaudio_transcription() — multipart upload,
Groq/Deepgram/Fireworks routingtext_to_speech() — returns raw audio bytesstream_chat() — async SSE iteratorrerank() — Cohere-compatible document
rerankingembeddings() with
encoding_format, dimensions,
input_typeSmartflowAgent with compliance scanning and
conversation memorySmartflowWorkflow for multi-step AI
pipelinesintelligent_scan,
submit_compliance_feedbackget_provider_health,
get_cache_stats, health_comprehensivechat, chat_completions,
embeddings, claude_messageSyncSmartflowClient© 2026 Langsmart, Inc. All rights reserved. Smartflow is a trademark of Langsmart, Inc.