LangSmart Smartflow · Platform v3.0

API & SDK
Reference

Every API surface the platform exposes — proxy endpoints, management APIs, MCP gateway, A2A gateway, vector store, RAG pipeline, VAS audit logs, and the Python SDK.

Proxy Port 7775 Management Port 7778 Compliance Port 7777 Policy Port 7782 SDK v0.3.0

Smartflow — API and SDK Reference

Platform version: 3.0 | SDK version: 0.3.0

Smartflow is an enterprise AI gateway that proxies requests to multiple LLM providers, enforces compliance policy, caches semantically, and orchestrates MCP tools and A2A agents. This document covers every API surface the platform exposes: the proxy endpoints, management APIs, MCP gateway, A2A gateway, vector store, RAG pipeline, and the Python SDK.


Table of Contents

  1. Architecture Overview
  2. Authentication
  3. LLM Proxy Endpoints
  4. Routing and Provider Selection
  5. MetaCache — Semantic Caching
  6. MCP Gateway
  7. A2A Agent Gateway
  8. Vector Store API
  9. RAG Pipeline API
  10. Management API
  11. Compliance API
  12. Policy Perfect API
  13. Alerting
  14. Alerting
  15. Observability
  16. Python SDK
  17. Environment Variables
  18. Response Headers
  19. Error Reference

Architecture Overview

Smartflow runs as five cooperating services:

Service Default Port Purpose
Proxy (smartflow) 7775 LLM proxy, MCP gateway, A2A gateway, semantic caching, pre/post-call compliance hooks
Management API (api_server) 7778 Virtual keys, routing chains, audit logs, analytics
Compliance API (compliance_api_server) 7777 ML content scanning, PII redaction, adaptive learning, intelligent scan
Policy Perfect API (policy_perfect_api) 7782 Policy and preset CRUD, AI document-to-policy generation, assignment management
Hybrid Bridge (smartflow-hybrid-bridge) 3500 Cross-datacenter Redis log aggregation

All five services share one Redis instance for shared state: routing tables, semantic cache, VAS logs, provider latency metrics, virtual key budgets, and MCP server registry. The Policy Perfect API additionally requires PostgreSQL for durable policy and preset storage. In production the proxy sits behind a TLS-terminating reverse proxy (Caddy or nginx). Management, compliance, and policy APIs are backend surfaces.


Authentication

Virtual Keys

The primary credential type. Issue sk-sf-{48-hex} tokens through the management API. Each key carries optional spend limits and model restrictions.

Authorization: Bearer sk-sf-a1b2c3...

Provider API Keys

Stored server-side. Clients never send raw provider keys. The proxy resolves the correct key from the server’s key store when forwarding to providers.

Passthrough Keys

For Anthropic native passthrough (/anthropic/*), the proxy injects the configured ANTHROPIC_API_KEY automatically. Clients do not need to supply x-api-key.

JWT (SafeChat / Dashboard)

The SafeChat product uses smartflow_token cookie-based JWT for browser sessions. JWT validation is handled by the application layer, not the proxy itself.


LLM Proxy Endpoints

The proxy listens on port 7775 by default.

POST /v1/chat/completions

OpenAI-compatible chat completions. Accepts any OpenAI-format request body. Provider and model are resolved automatically from the model name or explicit prefix.

Request body:

{
  "model": "gpt-4o",
  "messages": [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is the capital of France?"}
  ],
  "temperature": 0.7,
  "max_tokens": 256,
  "stream": false
}

Model prefix routing:

Prefix / Pattern Provider
gpt-*, o1-*, o3-*, chatgpt-* OpenAI
claude-* Anthropic
gemini-* Google Gemini
grok-* xAI
mistral-*, mixtral-* Mistral AI
command-*, c4ai-* Cohere
llama-*, groq/* Groq
openrouter/* OpenRouter
ollama/* Local Ollama
azure/* Azure OpenAI

Explicit prefix example:

{"model": "anthropic/claude-sonnet-4-6", ...}

No prefix needed for the primary supported providers — model name heuristic detects gemini-*, claude-*, gpt-*, etc. automatically.

Multimodal content:

Send content as an array of parts to include images, audio, or documents:

{
  "model": "gpt-4o",
  "messages": [{
    "role": "user",
    "content": [
      {"type": "text", "text": "What is in this image?"},
      {"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}
    ]
  }]
}

For audio with gpt-4o-audio-preview:

{
  "type": "input_audio",
  "input_audio": {"data": "<base64>", "format": "mp3"}
}

Response:

{
  "id": "chatcmpl-...",
  "object": "chat.completion",
  "model": "gpt-4o",
  "choices": [{
    "index": 0,
    "message": {"role": "assistant", "content": "Paris."},
    "finish_reason": "stop"
  }],
  "usage": {
    "prompt_tokens": 24,
    "completion_tokens": 3,
    "total_tokens": 27
  }
}

POST /anthropic/v1/messages

Native Anthropic Messages API passthrough. The proxy injects the API key from the server’s key store. The full Anthropic request and response format is preserved with no translation.

{
  "model": "claude-sonnet-4-6",
  "max_tokens": 1024,
  "system": "You are a helpful assistant.",
  "messages": [
    {"role": "user", "content": "Hello, Claude."}
  ]
}

Multimodal — image:

{
  "role": "user",
  "content": [
    {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "..."}},
    {"type": "text", "text": "Describe this image."}
  ]
}

Multimodal — PDF document:

{
  "type": "document",
  "source": {"type": "base64", "media_type": "application/pdf", "data": "..."}
}

The extended-context suffix [1m] that Claude Code appends to model names is stripped automatically.

Also accessible as /cursor/v1/messages for Cursor IDE passthrough (identical behavior).


POST /v1/embeddings

Generate vector embeddings. Supports multi-provider routing via model prefix.

{
  "model": "text-embedding-3-small",
  "input": "Your text here"
}

Multi-input:

{
  "model": "text-embedding-3-small",
  "input": ["First sentence", "Second sentence"]
}

Response format matches the OpenAI embeddings response with data[].embedding float arrays.


POST /v1/audio/transcriptions

Transcribe audio files. Multipart form upload.

POST /v1/audio/transcriptions
Content-Type: multipart/form-data

file=@audio.mp3
model=whisper-1

Routes to OpenAI Whisper by default. Use groq/whisper-large-v3 for Groq, deepgram/nova-2 for Deepgram.


POST /v1/audio/speech

Text-to-speech synthesis. Returns raw audio bytes.

{
  "model": "tts-1",
  "input": "Hello, world.",
  "voice": "nova",
  "response_format": "mp3"
}

POST /v1/images/generations

Image generation. Routes based on model name.

{
  "model": "dall-e-3",
  "prompt": "A futuristic city at sunrise",
  "n": 1,
  "size": "1024x1024",
  "quality": "hd",
  "style": "vivid",
  "response_format": "url"
}

POST /v1/completions

Legacy text completions endpoint, forwarded to the configured provider.


GET /v1/models

List available models. Returns the registered model list from all enabled providers.


POST /v1/rerank

Document reranking. Compatible with Cohere’s rerank API.

{
  "model": "rerank-english-v3.0",
  "query": "What is the return policy?",
  "documents": ["Document one text.", "Document two text."],
  "top_n": 3
}

Routing and Provider Selection

Automatic Model-Name Heuristic

For requests to /v1/chat/completions with no explicit provider prefix, the proxy infers the provider from the model name:

When a prefix is present (provider/model), it always takes precedence.

Intelligent Routing

When no specific provider can be inferred, the intelligent router selects based on real-time provider health and configured strategy. Enable per-provider participation with environment flags (GEMINI_ENABLED=true, etc.).

Routing Strategies

Configured in Redis via the management API:

Strategy Behavior
round_robin Distribute requests across targets in order
weighted Traffic proportional to assigned weights
least_connections Send to provider with fewest in-flight requests
random Random selection among healthy providers
priority Try targets in order, only fall back on failure
latency Route to provider with lowest p95 rolling EMA latency
cost Route to provider with lowest per-token cost

Latency EMA is computed over a rolling window and stored per provider in Redis. Cost-based routing checks daily budget caps and skips providers that have exceeded their limit.

Fallback Chains

Named ordered lists of providers with retry logic. Configured at POST /api/routing/fallback-chains.

{
  "name": "production-chain",
  "targets": [
    {"provider": "openai",    "model": "gpt-4o",              "weight": 1},
    {"provider": "anthropic", "model": "claude-sonnet-4-6", "weight": 1},
    {"provider": "google",    "model": "gemini-1.5-pro",      "weight": 1}
  ],
  "retry_on": ["429", "500", "502", "503"],
  "max_retries": 2,
  "backoff_ms": 500
}

On 429 or 5xx, the proxy retries the next target in the chain with exponential backoff. Non-retryable 4xx errors bypass retry.

Provider Budget Caps

Each provider can carry a daily spend cap. Once the cap is hit, the provider is excluded from routing and the fallback chain activates. Spend is tracked in Redis and resets at UTC midnight.

{
  "provider": "openai",
  "daily_budget_usd": 50.00
}

MetaCache — Semantic Caching

The MetaCache intercepts every /v1/chat/completions request before any provider call is made.

Cache Lookup

  1. Embed the incoming query.
  2. Compute cosine similarity against stored embeddings.
  3. If similarity exceeds the configured threshold, return the cached response.
  4. Otherwise, forward to the provider and store the response.

Responses are semantically compressed before storage to minimize Redis footprint.

Per-Request Cache Controls

These headers control caching behavior on a per-request basis:

Header Effect
Cache-Control: no-cache Bypass cache read; always query the provider
Cache-Control: no-store Bypass cache write; do not cache this response
x-smartflow-cache: no-cache Bypass cache read (Smartflow-specific shorthand)
x-smartflow-cache: no-store Bypass cache write (Smartflow-specific shorthand)
x-smartflow-cache: bypass Bypass both read and write
x-smartflow-no-cache: 1 Alias for no-cache
x-smartflow-cache-ttl: 3600 Override TTL in seconds for this response
x-smartflow-cache-namespace: <ns> Scope cache to a logical partition

Session and Conversation Tracking

Header Effect
x-conversation-id: <id> Attach a conversation/session ID to every VAS log entry for this request. Use your own opaque string (UUID, session token, etc.).
x-session-id: <id> Alias for x-conversation-id.

Pass one of these on every request in a session to correlate all log entries under the same conversation. The value appears as conversation_id in /api/vas/logs responses.

Cache Response Headers

Header Value
x-smartflow-cache-hit true when response is served from cache
x-smartflow-cache-key Cache key for client-side correlation
x-cache-similarity Similarity score (0–1) for semantic cache hits
x-tokens-saved Estimated tokens saved by this cache hit

MCP Gateway

Smartflow implements the Model Context Protocol (MCP) server gateway. Register external MCP servers and invoke their tools through the proxy with shared authentication, budgeting, and audit logging.

MCP Server Registry

GET /api/mcp/servers

List registered MCP servers.

POST /api/mcp/servers

Register an MCP server.

{
  "id": "github-tools",
  "name": "GitHub MCP Server",
  "base_url": "https://mcp.github.example.com",
  "auth_type": "bearer",
  "allowed_tools": ["list_repos", "create_issue"],
  "disallowed_tools": [],
  "cost_info": {"per_call_usd": 0.001},
  "guardrail_mode": "strict"
}

GET /api/mcp/catalog

Browse the tool catalog across all registered servers.

GET /api/mcp/tools/search?q={query}&k={n}

Semantic search over the tool catalog. Returns the top k tools matching the query.

GET /api/mcp/tools/index

Returns the full indexed tool list with embeddings metadata.

Tool Invocation

Tools are invoked through the standard MCP path. The proxy authenticates, applies per-tool access controls, and tracks cost:

POST /{server_id}/mcp/

Or via the MCP v1 path:

POST /mcp/v1/{server_id}/tools/call

MCP Usage and Logs

GET /api/mcp/usage

Aggregated cost and call counts per server and tool.

GET /api/mcp/logs

Per-invocation audit logs.

MCP Cache

GET /api/mcp/cache/stats

Cache hit rate and savings per server.

POST /api/mcp/cache/flush

Flush the full MCP tool response cache.

POST /api/mcp/cache/flush/{server_id}

Flush cache for a single server.

MCP Access Requests

Users can request access to restricted MCP servers. Admins approve or deny through the API.

GET /api/mcp/catalog/requests

List pending access requests.

POST /api/mcp/catalog/requests

Submit an access request.

{
  "server_id": "github-tools",
  "user_id": "user-123",
  "justification": "Need to create issues for incident tracking.",
  "tools_requested": ["create_issue"]
}

POST /api/mcp/catalog/requests/{id}/approve

Approve a request.

POST /api/mcp/catalog/requests/{id}/deny

Deny a request.

MCP OAuth Flow

For servers that require OAuth:

GET /api/mcp/auth/initiate?server_id={id}

Start the OAuth flow. Returns a redirect URL.

GET /api/mcp/auth/callback

OAuth callback handler.

GET /api/mcp/auth/tokens

List stored OAuth tokens.

MCP Tool Access Control

Per-server configuration fields:

Field Type Description
allowed_tools string[] If non-empty, only these tools may be called
disallowed_tools string[] These tools are always blocked
allowed_params object Per-tool parameter allowlists
guardrail_mode string "strict" blocks on any policy violation; "log" flags and continues
available_on_public_internet bool If false, only accessible from approved network segments

API Generation from OpenAPI Spec

POST /api/mcp/generate

Auto-generate an MCP server adapter from an OpenAPI specification.

{
  "spec": "<OpenAPI JSON or YAML string>",
  "server_id": "my-api",
  "server_name": "My REST API",
  "base_url": "https://api.example.com",
  "include_methods": ["GET", "POST"]
}

A2A Agent Gateway

Smartflow implements the A2A (Agent-to-Agent) protocol for inter-agent communication. Register external agents and invoke them with full logging and routing.

Agent Card

Each agent exposes a machine-readable capability card at:

GET /a2a/{agent_id}/.well-known/agent.json

Returns the agent’s name, capabilities, supported task types, and authentication requirements.

Task Invocation

POST /a2a/{agent_id}

Send a task to a registered agent. The proxy forwards the request, captures the response, and logs both.

{
  "id": "task-uuid",
  "message": {
    "role": "user",
    "parts": [{"type": "text", "text": "Summarize the latest earnings report."}]
  }
}

Supports both synchronous JSON responses and SSE streaming for long-running tasks.

Trace Header

Include x-a2a-trace-id to correlate task invocations across agents in distributed workflows.


Vector Store API

Built-in vector store backed by Redis. No external vector database required. All endpoints are on the proxy at port 7775.

POST /v1/vector_stores

Create a new vector store.

Request:

{
  "name": "product-documentation",
  "description": "Internal product docs",
  "metadata": {"team": "engineering"}
}

Response:

{
  "id": "vs_abc123",
  "name": "product-documentation",
  "description": "Internal product docs",
  "file_count": 0,
  "created_at": 1740000000
}

GET /v1/vector_stores

List all vector stores.

GET /v1/vector_stores/{id}

Get a specific vector store by ID.

DELETE /v1/vector_stores/{id}

Delete a vector store and all its files.

POST /v1/vector_stores/{id}/files

Add a text document to a vector store. The document is chunked and embedded automatically.

Request:

{
  "content": "Full document text goes here...",
  "filename": "architecture.md",
  "metadata": {"version": "3.0"}
}

Response:

{
  "id": "vf_xyz789",
  "store_id": "vs_abc123",
  "filename": "architecture.md",
  "bytes": 4096,
  "status": "completed",
  "created_at": 1740000000
}

GET /v1/vector_stores/{id}/files

List files in a vector store.

POST /v1/vector_stores/{id}/search

Semantic search over stored documents.

Request:

{
  "query": "How does the caching layer work?",
  "max_results": 5,
  "score_threshold": 0.7
}

Response:

{
  "results": [
    {
      "file_id": "vf_xyz789",
      "filename": "architecture.md",
      "content": "...relevant chunk text...",
      "score": 0.91
    }
  ],
  "total": 1
}

RAG Pipeline API

Built on top of the vector store. Ingest documents with automatic chunking, then retrieve context for LLM augmentation.

POST /v1/rag/ingest

Chunk a document, embed each chunk, and store in a named vector store.

Request:

{
  "content": "Full document text...",
  "vector_store_id": "vs_abc123",
  "filename": "report-q4.txt",
  "chunk_size": 512,
  "chunk_overlap": 64,
  "metadata": {"source": "internal"}
}
Field Type Default Description
content string required Full document text
vector_store_id string required Target store (must exist)
filename string "" Display name for the file
chunk_size int 512 Characters per chunk
chunk_overlap int 64 Overlap between consecutive chunks
metadata object {} Arbitrary key-value metadata

Response:

{
  "store_id": "vs_abc123",
  "file_id": "vf_xyz789",
  "chunks_created": 12,
  "status": "completed"
}

POST /v1/rag/query

Embed a question, retrieve matching chunks, and optionally assemble a context string for injection into an LLM prompt.

Request:

{
  "query": "What were the Q4 revenue figures?",
  "vector_store_id": "vs_abc123",
  "max_results": 5,
  "score_threshold": 0.0,
  "include_context": true
}
Field Type Default Description
query string required Natural language question
vector_store_id string required Store to search
max_results int 5 Maximum chunks to return
score_threshold float 0.0 Minimum cosine similarity
include_context bool true Concatenate chunks into a context field

Response:

{
  "chunks": [
    {"content": "...relevant chunk...", "score": 0.88, "file_id": "vf_xyz789", "filename": "report-q4.txt"}
  ],
  "context": "...relevant chunk... [additional chunks concatenated]",
  "total": 3
}

Inject context into your LLM system prompt to ground the model’s answer in your documents.


Management API

The management API runs on port 7778. Clients interact with it for administrative operations.

Virtual Keys

GET /api/enterprise/vkeys

List all virtual keys.

POST /api/enterprise/vkeys

Create a virtual key.

{
  "alias": "team-alpha",
  "budget_period": "monthly",
  "max_budget_usd": 100.00,
  "model_restrictions": ["gpt-4o", "claude-sonnet-4-6"],
  "rpm_limit": 60,
  "tpm_limit": 100000
}

DELETE /api/enterprise/vkeys/{key}

Revoke a virtual key.

Routing

GET /api/routing/fallback-chains

List configured fallback chains.

POST /api/routing/fallback-chains

Create or update a fallback chain (see schema above).

DELETE /api/routing/fallback-chains/{name}

Delete a fallback chain.

GET /api/routing/status

Current routing state: active provider, fallback chain, last failure.

POST /api/routing/force-provider

Force all requests to a specific provider for a duration.

{
  "provider": "openai",
  "duration_seconds": 600
}

Audit Logs (VAS)

GET /api/vas/logs

Retrieve VAS (Value-Add Service) audit logs. Every request proxied through Smartflow produces a VAS log entry, stored in Redis (hot tier, default 2-hour TTL) and flushed to MongoDB by the hybrid bridge (cold tier, long-term retention).

Query parameters:

Parameter Default Description
limit 100 Max entries to return. Maximum 1000.
offset 0 Skip the first N entries (pagination).
provider Filter by provider name (openai, anthropic, etc.).
model Filter by model name.
days 30 Look-back window in days (1–365).

Example:

GET /api/vas/logs?limit=50&offset=0&provider=openai&days=7

Response:

{
  "success": true,
  "total": 50,
  "limit": 50,
  "offset": 0,
  "data": [
    {
      "request_id": "a1b2c3d4-...",
      "timestamp": "2026-03-13T20:12:51Z",
      "provider": "openai",
      "model": "gpt-4.1-mini-2025-04-14",
      "model_provider": "openai",
      "tokens_used": 29,
      "cost": 0.0000058,
      "latency_ms": 971,
      "processing_time_ms": 971,
      "content_type": "chat",
      "user_id": "user@example.com",
      "conversation_id": "conv-session-abc",
      "conversation_stage": "initial",
      "routing_strategy": "direct",
      "routing_reason": "provider:openai",
      "compliance_status": "compliant",
      "compliance_violations": null,
      "metacache": {
        "hit": false,
        "query": null,
        "tokens_saved": 0
      },
      "compliance": {
        "data_classification": "standard",
        "compliance_score": 1.0,
        "has_violations": false,
        "violations": [],
        "regulatory_frameworks": []
      },
      "metrics": {
        "prompt_tokens": 22,
        "completion_tokens": 7,
        "total_tokens": 29,
        "processing_time_ms": 971,
        "success": true
      }
    }
  ]
}

Cache hit log example (shows routing_strategy: "cache", metacache.tokens_saved, and model extracted from request):

{
  "model": "gpt-4.1-mini",
  "latency_ms": 5,
  "routing_strategy": "cache",
  "routing_reason": "cache_hit:tier=L1",
  "metacache": {
    "hit": true,
    "query": "Explain TLS handshake",
    "tokens_saved": 209
  }
}

Security note: Provider API keys are never stored in VAS logs. The metrics.custom_metrics.api_key field contains only a masked prefix (sk-proj-hAi0...).

Conversation tracking: Pass x-conversation-id or x-session-id as a request header to populate conversation_id in every log entry for that session.

GET /api/vas/metrics

Aggregate metrics over the VAS log window.

{
  "total_requests": 368,
  "success_rate": 100.0,
  "avg_latency_ms": 535,
  "total_tokens": 18240,
  "providers": ["openai", "anthropic"],
  "models": ["gpt-4.1-mini-2025-04-14", "claude-3-5-haiku-20241022"]
}

Analytics

GET /api/analytics?period=7d

Usage analytics: request volume, cost by provider, cache hit rate, top models, top users.

Provider Key Store

POST /api/enterprise/keys

Store a provider API key server-side.

{
  "provider": "openai",
  "api_key": "sk-..."
}

Compliance and Policy API

The compliance API runs on port 7777.

Scan Request

POST /v1/compliance/scan

Scan arbitrary content against the configured compliance policies.

{
  "content": "Text to scan",
  "policy": "enterprise_standard",
  "user_id": "user-123",
  "org_id": "acme"
}

Response:

{
  "has_violations": false,
  "compliance_score": 0.97,
  "risk_level": "low",
  "recommended_action": "Allow",
  "violations": [],
  "pii_detected": [],
  "redacted_content": null
}

Intelligent Scan (ML-based)

POST /v1/compliance/intelligent-scan

Run the Maestro ML policy engine. Unlike rule-based scanning, Maestro evaluates intent against your organization’s policy documents.

Response includes:

{
  "risk_score": 0.12,
  "risk_level": "low",
  "recommended_action": "Allow",
  "violations": [],
  "explanation": "Content is within organizational policy."
}

Compliance Feedback

POST /v1/compliance/feedback

Submit a correction to improve the ML model’s future predictions.

{
  "scan_id": "scan-xyz",
  "correct_action": "Allow",
  "correct_risk_level": "low",
  "notes": "False positive — internal terminology"
}

Redact PII

POST /v1/compliance/redact

Detect and redact personally identifiable information.

{"content": "Call me at 555-867-5309, email john@example.com"}

Returns: "Call me at [PHONE], email [EMAIL]"

ML Learning Status

GET /v1/compliance/learning/status/{user_id}

Check the adaptive learning status for a specific user profile.

GET /v1/compliance/learning/summary

Organization-wide learning summary.

GET /v1/compliance/ml/stats

Model accuracy, precision, recall, and training data statistics.

GET /v1/compliance/org/baseline/{org_id}

Organization-specific behavioral baseline used for anomaly detection.


Policy Perfect API

The Policy Perfect API runs on port 7782. It manages the organization’s compliance policy library — the source documents the Maestro ML engine reads when evaluating requests. Backed by PostgreSQL.

GET /health

Liveness check for the Policy Perfect service.

GET /api/stats

Returns aggregate counts for the current state of the policy library.

{
  "total_policies": 42,
  "total_presets": 8,
  "total_applications": 1204,
  "compliance_violations": 3
}

Policies

Policies are named, versioned compliance rules attached to scopes (provider, model, team, role). The Maestro engine evaluates all active policies on every request.

Policy types:

Type Description
compliance Regulatory and legal compliance rules (HIPAA, GDPR, SOC 2, etc.)
brand Brand voice and communication standards
format Output format constraints
role Role-based access and behavior restrictions
industry Industry-specific usage rules
legal Legal department rules and disclaimers
security Security guardrails and data handling policies

GET /api/policies

List all active policies.

POST /api/policies

Create a policy.

{
  "name": "HIPAA PHI Protection",
  "description": "Prevent transmission of protected health information",
  "policy_type": "compliance",
  "content": "Do not include patient names, diagnoses, medical record numbers, or any PHI in AI responses.",
  "priority": 90,
  "applicable_providers": ["all"],
  "applicable_models": ["all"],
  "regulatory_framework": "HIPAA",
  "severity": "critical",
  "metadata": {
    "departments": ["clinical", "billing"],
    "ad_groups": ["clinicians", "admins"]
  }
}
Field Type Description
name string Policy display name
policy_type string One of the seven policy types above
content string The policy text read by the Maestro ML engine
priority int Evaluation order (0–100); higher values evaluated first
applicable_providers string[] Providers this policy applies to; ["all"] for universal
applicable_models string[] Models this policy applies to; ["all"] for universal
regulatory_framework string Optional — HIPAA, GDPR, SOC2, PCI-DSS, etc.
severity string critical, high, medium, low
metadata object Layer 2/3 targeting: source_ips, ad_groups, departments, applications

GET /api/policies/{id}

Get a policy by ID.

PUT /api/policies/{id}

Update a policy. All fields optional; only supplied fields are changed. Set is_active: false to deactivate without deleting.

DELETE /api/policies/{id}

Delete a policy permanently.


Presets

Presets are named, ordered collections of policies. Assign a preset to a team, role, or virtual key instead of managing individual policies.

GET /api/presets

List all presets. Each entry includes the preset metadata and the ordered policy list.

POST /api/presets

Create a preset.

{
  "name": "Healthcare Standard",
  "description": "Default policy set for all clinical staff",
  "use_case": "Clinical AI assistant",
  "policy_ids": ["pol_hipaa_phi", "pol_brand_tone", "pol_no_diagnosis"]
}

Policy order in policy_ids determines evaluation priority.

GET /api/presets/{id}

Get a preset and its full ordered policy list.


AI Document-to-Policy Generation

Upload a compliance document (PDF, DOCX, TXT — up to 50 MB). The service uses GPT-4o to extract structured policy suggestions automatically. Processing is asynchronous; poll for progress with the returned job ID.

POST /api/policies/generate-from-document

Multipart form upload. Field name: file.

POST /api/policies/generate-from-document
Content-Type: multipart/form-data

file=@hipaa-policy-handbook.pdf

Immediate response:

{
  "success": true,
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "message": "Document processing started."
}

GET /api/documents/job/{job_id}/progress

Poll for processing status.

{
  "success": true,
  "job": {
    "id": "550e8400-...",
    "filename": "hipaa-policy-handbook.pdf",
    "status": "processing",
    "progress_pct": 62,
    "created_at": "2026-02-19T10:00:00Z"
  }
}

Status values: pending, processing, completed, failed.

GET /api/documents/job/{job_id}/results

Retrieve suggested policies once status is completed.

{
  "success": true,
  "job_id": "550e8400-...",
  "filename": "hipaa-policy-handbook.pdf",
  "total_policies": 7,
  "suggested_policies": [
    {
      "id": "sugg_abc",
      "name": "Minimum Necessary Standard",
      "type": "compliance",
      "content": "Limit PHI access and disclosure to the minimum necessary...",
      "priority": 85,
      "regulatory_framework": "HIPAA",
      "confidence": 0.94
    }
  ]
}

Review suggestions and create them as live policies via POST /api/policies.


Alerting

Smartflow fires webhooks when threshold events occur. Configuration is via environment variables on the proxy.

Alert Types

Type Trigger
BudgetThreshold Provider or virtual key spend exceeds configured cap
ProviderFailure Error rate for a provider exceeds spike threshold
SlowRequest Request latency exceeds slow-request threshold
Custom Programmatic alerts from the management API

Webhook Destinations

Set any combination of:

SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
TEAMS_WEBHOOK_URL=https://outlook.office.com/webhook/...
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/...
SMARTFLOW_ALERTS_ENABLED=true

Alerts are fire-and-forget — they do not block the request that triggered them.


Observability

GET /health/liveliness

Returns 200 OK with {"status": "ok"} when the proxy process is running.

GET /health/readiness

Returns 200 OK when the proxy is ready to serve requests (Redis connected, providers reachable).

GET /metrics

Prometheus-compatible metrics endpoint. Exposes:


Python SDK

Installation

pip install smartflow-sdk

Or from source:

pip install git+https://github.com/SRAGroupTX/SmartflowV3.git#subdirectory=sdk/python

Requirements: Python 3.10+, httpx >= 0.24


SmartflowClient

The primary async client.

class SmartflowClient(
    base_url: str,
    api_key: Optional[str] = None,
    timeout: float = 30.0,
    management_port: int = 7778,
    compliance_port: int = 7777,
    bridge_port: int = 3500,
)
Parameter Type Default Description
base_url str Proxy URL, e.g. "https://smartflow.example.com" or "http://localhost:7775"
api_key str None Virtual key (sk-sf-...) sent as Authorization: Bearer
timeout float 30.0 Request timeout in seconds
management_port int 7778 Management API port
compliance_port int 7777 Compliance API port
bridge_port int 3500 Hybrid bridge port

Usage:

from smartflow import SmartflowClient

async with SmartflowClient("https://smartflow.example.com", api_key="sk-sf-...") as sf:
    reply = await sf.chat("What is the capital of France?")
    print(reply)

Manual lifecycle:

sf = SmartflowClient("https://smartflow.example.com")
await sf._ensure_client()
reply = await sf.chat("Hello!")
await sf.close()

Core AI Methods

chat()

Send a message, receive the reply as a plain string.

async def chat(
    message: str,
    model: str = "gpt-4o",
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    **kwargs,
) -> str
reply = await sf.chat("Summarise this in one sentence.", model="claude-sonnet-4-6")

chat_completions()

Full OpenAI-compatible completions. Returns an AIResponse object.

async def chat_completions(
    messages: List[Dict[str, str]],
    model: str = "gpt-4o",
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    stream: bool = False,
    **kwargs,
) -> AIResponse
response = await sf.chat_completions(
    messages=[
        {"role": "system", "content": "You are a concise assistant."},
        {"role": "user", "content": "What is 2 + 2?"},
    ],
    model="gpt-4o-mini",
    max_tokens=50,
)
print(response.content)
print(response.usage.total_tokens)

stream_chat()

Async generator that yields text delta strings as they stream.

async def stream_chat(
    message: str,
    model: str = "gpt-4o",
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    **kwargs,
) -> AsyncIterator[str]
async for chunk in sf.stream_chat("Tell me a story about a robot"):
    print(chunk, end="", flush=True)

embeddings()

Generate vector embeddings.

async def embeddings(
    input: Union[str, List[str]],
    model: str = "text-embedding-3-small",
    encoding_format: str = "float",
    dimensions: Optional[int] = None,
    input_type: Optional[str] = None,
    **kwargs,
) -> Dict[str, Any]
result = await sf.embeddings("Hello world", model="text-embedding-3-small")
vector = result["data"][0]["embedding"]

# Cohere
result = await sf.embeddings(
    ["doc one", "doc two"],
    model="cohere/embed-english-v3.0",
    input_type="search_document",
)

# Reduce dimensions
result = await sf.embeddings("Hello", model="text-embedding-3-large", dimensions=256)

Supported embedding providers:

Prefix Example
(none) text-embedding-3-small (OpenAI)
cohere/ cohere/embed-english-v3.0
mistral/ mistral/mistral-embed
nvidia_nim/ nvidia_nim/nvidia/nv-embedqa-e5-v5
huggingface/ huggingface/BAAI/bge-large-zh
azure/ azure/my-embedding-deployment
gemini/ gemini/text-embedding-004
vertex_ai/ vertex_ai/textembedding-gecko

image_generation()

Generate images.

async def image_generation(
    prompt: str,
    model: str = "dall-e-3",
    n: int = 1,
    size: str = "1024x1024",
    quality: Optional[str] = None,
    response_format: str = "url",
    style: Optional[str] = None,
    **kwargs,
) -> Dict[str, Any]
result = await sf.image_generation(
    "A mountain landscape at dawn",
    model="dall-e-3",
    size="1792x1024",
    quality="hd",
)
print(result["data"][0]["url"])

audio_transcription()

Transcribe audio. Accepts a file-like object.

async def audio_transcription(
    file: BinaryIO,
    model: str = "whisper-1",
    language: Optional[str] = None,
    prompt: Optional[str] = None,
    response_format: str = "json",
    temperature: float = 0.0,
    **kwargs,
) -> Dict[str, Any]
with open("recording.mp3", "rb") as f:
    result = await sf.audio_transcription(f, model="whisper-1")
print(result["text"])

# Groq Whisper (faster, same format)
with open("recording.mp3", "rb") as f:
    result = await sf.audio_transcription(f, model="groq/whisper-large-v3")

text_to_speech()

Synthesize speech. Returns raw audio bytes.

async def text_to_speech(
    input: str,
    model: str = "tts-1",
    voice: str = "alloy",
    response_format: str = "mp3",
    speed: float = 1.0,
    **kwargs,
) -> bytes
audio = await sf.text_to_speech("Hello, this is Smartflow.", voice="nova")
with open("output.mp3", "wb") as f:
    f.write(audio)

rerank()

Rerank documents by relevance to a query.

async def rerank(
    query: str,
    documents: List[str],
    model: str = "rerank-english-v3.0",
    top_n: Optional[int] = None,
    **kwargs,
) -> Dict[str, Any]
result = await sf.rerank(
    "What is the return policy?",
    ["We accept returns within 30 days.", "Contact us at support@example.com."],
    top_n=1,
)

list_models()

List available models.

async def list_models() -> List[Dict[str, Any]]

claude_message()

Send a message directly to Claude via the Anthropic Messages API native path. The proxy injects the API key automatically; no anthropic_key is required in production.

async def claude_message(
    message: str,
    model: str = "claude-sonnet-4-6",
    max_tokens: int = 1024,
    system: Optional[str] = None,
    anthropic_key: Optional[str] = None,
) -> str
reply = await sf.claude_message(
    "Summarise this contract in three bullet points.",
    model="claude-sonnet-4-6",
    system="You are a legal assistant.",
    max_tokens=512,
)

Routes to /anthropic/v1/messages (native Anthropic format). For multimodal or multi-turn use, call chat_completions() with model="claude-sonnet-4-6" and the OpenAI-compatible message format, or call /anthropic/v1/messages directly via httpx.

chatbot_query()

Query Smartflow’s built-in system chatbot. The chatbot answers natural-language questions about VAS logs, cache stats, cost analysis, and system health — useful for quick operational queries without building a dashboard.

async def chatbot_query(query: str) -> Dict[str, Any]
result = await sf.chatbot_query("show me today's cache stats")
print(result["response"])

result = await sf.chatbot_query("which provider had the most errors this week?")

Provider Routing Examples

All chat and completion methods accept a model argument that determines which provider is used. No additional configuration is required.

# OpenAI
reply = await sf.chat("Hello", model="gpt-4o")
reply = await sf.chat("Hello", model="gpt-4o-mini")
reply = await sf.chat("Hello", model="o3-mini")

# Anthropic — via model-name heuristic (no prefix needed)
reply = await sf.chat("Hello", model="claude-sonnet-4-6")
reply = await sf.chat("Hello", model="claude-3-opus-20240229")

# Google Gemini — via model-name heuristic
reply = await sf.chat("Hello", model="gemini-1.5-pro")
reply = await sf.chat("Hello", model="gemini-2.0-flash")

# xAI Grok — explicit prefix required
reply = await sf.chat("Hello", model="xai/grok-2-latest")

# Mistral — explicit prefix
reply = await sf.chat("Hello", model="mistral/mistral-large-latest")

# Cohere — explicit prefix
reply = await sf.chat("Hello", model="cohere/command-r-plus")

# Groq (fast Llama inference)
reply = await sf.chat("Hello", model="groq/llama-3.1-70b-versatile")

# OpenRouter (access 200+ models through one key)
reply = await sf.chat("Hello", model="openrouter/meta-llama/llama-3.1-405b")

# Local Ollama
reply = await sf.chat("Hello", model="ollama/llama3.2")

# Azure OpenAI — deployment name as suffix
reply = await sf.chat("Hello", model="azure/my-gpt4o-deployment")

# Force native Anthropic path (uses /anthropic/v1/messages)
reply = await sf.claude_message("Hello", model="claude-sonnet-4-6")

MCP Tool Invocation via SDK

The Python SDK does not expose dedicated MCP methods. MCP tool calls are made via direct HTTP requests to the proxy. Use the client’s internal HTTP session or httpx directly.

import httpx

# Call an MCP tool via the proxy
async with httpx.AsyncClient() as client:
    response = await client.post(
        "https://smartflow.example.com/github-tools/mcp/",
        headers={
            "Authorization": "Bearer sk-sf-...",
            "Content-Type": "application/json",
        },
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/call",
            "params": {
                "name": "create_issue",
                "arguments": {
                    "repo": "my-org/my-repo",
                    "title": "Bug: login fails on mobile",
                    "body": "Steps to reproduce..."
                }
            }
        }
    )
    result = response.json()
    print(result["result"]["content"])

Search the MCP tool catalog before calling:

import httpx

async with httpx.AsyncClient() as client:
    # Find tools matching a natural-language query
    r = await client.get(
        "https://smartflow.example.com/api/mcp/tools/search",
        params={"q": "create github issue", "k": 3},
        headers={"Authorization": "Bearer sk-sf-..."},
    )
    tools = r.json()["results"]
    for t in tools:
        print(f"{t['server_id']}.{t['name']}: {t['description']}")

A2A Agent Invocation via SDK

A2A tasks are sent as HTTP POST requests to the proxy. The proxy forwards to the registered agent, logs the exchange, and returns the response.

import httpx

# Send a task to a registered A2A agent
async with httpx.AsyncClient() as client:
    response = await client.post(
        "https://smartflow.example.com/a2a/summarizer-agent",
        headers={
            "Authorization": "Bearer sk-sf-...",
            "Content-Type": "application/json",
            "x-a2a-trace-id": "trace-abc-123",   # optional: for cross-agent correlation
        },
        json={
            "id": "task-uuid-001",
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Summarise the attached earnings report."}]
            }
        }
    )
    result = response.json()
    print(result["result"]["parts"][0]["text"])

Retrieve the agent’s capability card:

r = await client.get(
    "https://smartflow.example.com/a2a/summarizer-agent/.well-known/agent.json",
    headers={"Authorization": "Bearer sk-sf-..."},
)
card = r.json()
print(card["capabilities"])

Compliance Methods

check_compliance()

Rule-based compliance scan.

async def check_compliance(
    content: str,
    policy: str = "enterprise_standard",
    user_id: Optional[str] = None,
    org_id: Optional[str] = None,
) -> ComplianceResult
result = await sf.check_compliance("User message text", policy="hipaa")
if result.has_violations:
    print(result.violations)

intelligent_scan()

ML-based compliance scan using the Maestro policy engine. Combines regex pattern matching, ML embedding similarity, behavioral analysis, and organization baselines.

async def intelligent_scan(
    content: str,
    user_id: Optional[str] = None,
    org_id: Optional[str] = None,
    context: Optional[str] = None,
) -> IntelligentScanResult
result = await sf.intelligent_scan(
    "My SSN is 123-45-6789",
    user_id="user-123",
    org_id="acme-corp",
    context="customer_support",
)
print(f"{result.risk_level}: {result.recommended_action}")
# "high: Block"

redact_pii()

Detect and redact PII from content.

async def redact_pii(content: str) -> str
clean = await sf.redact_pii("My SSN is 123-45-6789, email me at john@example.com")
# "My SSN is [SSN], email me at [EMAIL]"

submit_compliance_feedback()

Submit a true/false-positive correction to improve the ML model’s future predictions.

async def submit_compliance_feedback(
    scan_id: str,
    is_false_positive: bool,
    user_id: Optional[str] = None,
    notes: Optional[str] = None,
) -> Dict[str, Any]
await sf.submit_compliance_feedback(
    scan_id="scan-xyz",
    is_false_positive=True,
    notes="This was a test phone number, not real PII",
)

get_learning_status()

Learning progress for a specific user profile.

async def get_learning_status(user_id: str) -> LearningStatus

get_learning_summary()

Organization-wide learning summary across all users.

async def get_learning_summary() -> LearningSummary

get_ml_stats()

Statistics about the ML compliance engine: pattern counts, accuracy, categories.

async def get_ml_stats() -> MLStats

get_org_summary()

Organization-level compliance summary.

async def get_org_summary() -> Dict[str, Any]

get_org_baseline()

Behavioral baseline for a specific organization, used for anomaly detection.

async def get_org_baseline(org_id: str) -> OrgBaseline
baseline = await sf.get_org_baseline("acme-corp")

get_persistence_stats()

Redis persistence statistics for compliance data.

async def get_persistence_stats() -> PersistenceStats

save_compliance_data()

Trigger a manual flush of in-memory compliance data to Redis.

async def save_compliance_data() -> Dict[str, Any]

get_intelligent_health()

Health status of the ML compliance engine and all sub-components.

async def get_intelligent_health() -> Dict[str, Any]

Monitoring Methods

health()

Basic health check.

async def health() -> Dict[str, Any]

health_comprehensive()

Full health check including Redis, providers, and cache.

async def health_comprehensive() -> SystemHealth
h = await sf.health_comprehensive()
print(h.overall_status)          # "healthy"
print(h.redis_connected)         # True
print(h.providers_available)     # ["openai", "anthropic", "google"]

get_provider_health()

Latency and availability per provider.

async def get_provider_health() -> List[ProviderHealth]

get_cache_stats()

Cache hit rates, token savings, and cost savings.

async def get_cache_stats() -> CacheStats
stats = await sf.get_cache_stats()
print(f"Hit rate:     {stats.hit_rate:.1%}")
print(f"Tokens saved: {stats.tokens_saved:,}")
print(f"Cost saved:   ${stats.cost_saved_usd:.4f}")
print(f"L1/L2/L3:     {stats.l1_hits} / {stats.l2_hits} / {stats.l3_hits}")

get_logs()

VAS audit logs from the local instance.

async def get_logs(
    limit: int = 50,
    provider: Optional[str] = None,
) -> List[VASLog]

get_logs_hybrid()

VAS logs aggregated from all Smartflow instances via the hybrid bridge (Redis + MongoDB combined). Use this in multi-region or multi-instance deployments.

async def get_logs_hybrid(limit: int = 100) -> List[Dict[str, Any]]
all_logs = await sf.get_logs_hybrid(limit=500)
for log in all_logs:
    print(f"{log['timestamp']} | {log['provider']} | {log['model']}")

get_analytics()

Usage analytics over a period.

async def get_analytics(period: str = "7d") -> Dict[str, Any]

get_routing_status()

Current routing state.

async def get_routing_status() -> Dict[str, Any]

force_provider()

Force routing to a specific provider.

async def force_provider(
    provider: str,
    duration_seconds: int = 300,
) -> Dict[str, Any]

SmartflowAgent

Stateful agent with conversation memory and per-message compliance scanning.

class SmartflowAgent(
    client: SmartflowClient,
    name: str = "SmartflowAgent",
    model: str = "gpt-4o",
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    compliance_policy: str = "enterprise_standard",
    enable_compliance_scan: bool = True,
    user_id: Optional[str] = None,
    org_id: Optional[str] = None,
    tools: Optional[List[Dict]] = None,
)
async with SmartflowClient("https://smartflow.example.com", api_key="sk-...") as sf:
    agent = SmartflowAgent(
        client=sf,
        name="SupportBot",
        model="gpt-4o",
        system_prompt="You are a helpful customer support agent.",
        user_id="user-123",
        org_id="acme",
    )
    r1 = await agent.chat("How do I reset my password?")
    r2 = await agent.chat("What if I forgot my email too?")

    print(agent.message_count)
    agent.clear_history()
Method Description
chat(message, scan_input=True, scan_output=True) Send message, raises ComplianceError if blocked
clear_history() Reset conversation, keep system prompt
get_history() Return copy of message history
message_count Number of messages in history

SmartflowWorkflow

Chain AI operations with branching and error handling.

workflow = SmartflowWorkflow(client, name="TicketFlow")

workflow \
    .add_step("classify", action="chat",
              config={"prompt": "Classify this ticket: {input}", "model": "gpt-4o-mini"}) \
    .add_step("check", action="compliance_check",
              config={"content": "{output}"}) \
    .add_step("route", action="condition",
              config={"field": "output", "cases": {"billing": "billing_step"}, "default": "general_step"})

result = await workflow.execute({"input": ticket_text})
print(result.output)
print(result.steps_executed)
print(result.execution_time_ms)

Step actions:

Action Config fields Description
"chat" prompt, model, temperature Chat completion; {input} / {output} are template variables
"compliance_check" content Compliance scan
"condition" field, cases, default Branch on a context value

SyncSmartflowClient

Synchronous wrapper for scripts and Jupyter notebooks. Every async method is available without await.

from smartflow import SyncSmartflowClient

sf = SyncSmartflowClient("https://smartflow.example.com", api_key="sk-...")

reply      = sf.chat("Hello!")
emb        = sf.embeddings("Hello", model="text-embedding-3-small")
img        = sf.image_generation("A sunset", model="dall-e-3")
transcript = sf.audio_transcription(open("audio.mp3", "rb"), model="whisper-1")
audio      = sf.text_to_speech("Hello!", voice="nova")
ranked     = sf.rerank("What is the return policy?", ["doc1", "doc2"])

In Jupyter with an existing event loop, install nest_asyncio:

pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()

OpenAI Drop-in Replacement

Any code targeting the OpenAI API works unchanged by pointing base_url at Smartflow:

from openai import OpenAI

client = OpenAI(
    api_key="sk-sf-your-virtual-key",
    base_url="https://smartflow.example.com/v1"
)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}]
)

This surfaces the MetaCache, compliance scanning, VAS logging, and routing — all transparently.


Response Types

AIResponse

Field Type Description
content str First choice text
choices list Full choices array
usage Usage Token usage
model str Model used
id str Response ID

Usage

Field Type
prompt_tokens int
completion_tokens int
total_tokens int

CacheStats

Field Type
hit_rate float
total_requests int
tokens_saved int
cost_saved_usd float
l1_hits int
l2_hits int
l3_hits int

ComplianceResult

Field Type
has_violations bool
compliance_score float
violations list[str]
pii_detected list[str]
risk_level str"low" / "medium" / "high" / "critical"
recommendations list[str]
redacted_content str \| None

IntelligentScanResult

Field Type
risk_score float — 0.0 to 1.0
risk_level str
recommended_action str"Allow" / "Flag" / "Block"
violations list
explanation str

SystemHealth

Field Type
overall_status str"healthy" / "degraded" / "unhealthy"
redis_connected bool
providers_available list[str]

Environment Variables

Server-side configuration. These are set on the Smartflow proxy and management API server, not in client code.

Provider Keys

Variable Provider
OPENAI_API_KEY OpenAI
ANTHROPIC_API_KEY Anthropic
GEMINI_API_KEY Google Gemini
XAI_API_KEY xAI / Grok
OPENROUTER_API_KEY OpenRouter
AZURE_API_KEY, AZURE_API_BASE, AZURE_API_VERSION Azure OpenAI
MISTRAL_API_KEY Mistral AI
COHERE_API_KEY Cohere
GROQ_API_KEY Groq
DEEPGRAM_API_KEY Deepgram
FIREWORKS_API_KEY Fireworks AI
NVIDIA_NIM_API_KEY, NVIDIA_NIM_API_BASE NVIDIA NIM
HUGGINGFACE_API_KEY, HUGGINGFACE_API_BASE HuggingFace
TOGETHER_API_KEY Together AI
PERPLEXITY_API_KEY Perplexity AI
REPLICATE_API_KEY Replicate
VERTEXAI_API_KEY, VERTEXAI_PROJECT, VERTEXAI_LOCATION Vertex AI
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION AWS Bedrock
NOVITA_API_KEY Novita AI
VERCEL_AI_GATEWAY_API_KEY Vercel AI Gateway

Feature Flags

Variable Default Description
GEMINI_ENABLED false Enable Google Gemini in intelligent routing
SMARTFLOW_ALERTS_ENABLED true Enable webhook alerting
SLACK_WEBHOOK_URL Slack incoming webhook
TEAMS_WEBHOOK_URL Microsoft Teams webhook
DISCORD_WEBHOOK_URL Discord webhook

Ports

Variable Default Description
PROXY_PORT 7775 LLM proxy port
MANAGEMENT_PORT 7778 Management API port
COMPLIANCE_PORT 7777 Compliance API port
BRIDGE_PORT 3500 Hybrid bridge port

Response Headers

Every proxied response includes these headers:

Header Description
x-smartflow-provider Provider that served the request (openai, anthropic, google, etc.)
x-smartflow-model Actual model used
x-smartflow-request-id Unique request ID for log correlation — matches request_id in VAS logs
x-smartflow-call-id Alias for x-smartflow-request-id
x-smartflow-cache-hit true if response was served from MetaCache
x-smartflow-cache-key Cache key when x-smartflow-cache-hit: true
x-cache-similarity Cosine similarity score (0–1) for semantic cache hits
x-tokens-saved Estimated tokens saved by the cache hit
x-smartflow-latency-ms Total proxy latency in milliseconds
x-smartflow-cost-usd Estimated cost in USD for this request
x-smartflow-compliance-score Compliance score (0–1) when pre-call scan is enabled

Error Reference

HTTP Status Codes

Code Meaning
400 Malformed request — check body format
401 Missing or invalid API key
402 Virtual key budget exceeded
403 Request blocked by compliance policy
404 Resource or route not found
429 Rate limit exceeded (RPM or TPM)
500 Proxy internal error
502 Upstream provider returned an error
503 No providers available — fallback chain exhausted

SDK Exceptions

All importable from smartflow or smartflow.exceptions:

Exception Condition
SmartflowError Base class for all SDK errors
ConnectionError Cannot connect to proxy
AuthenticationError 401 — invalid or missing key
RateLimitError 429 — rate limit hit
ComplianceError 403 — request blocked by policy
ProviderError Upstream provider error
TimeoutError Request timeout
from smartflow import ComplianceError, RateLimitError
import asyncio

try:
    result = await sf.chat("sensitive message")
except ComplianceError as e:
    print(f"Blocked by policy: {e}")
except RateLimitError:
    await asyncio.sleep(60)
    # retry

Changelog

v3.0 (proxy) / v0.3.0 (SDK) — 2026

New in the proxy:

New in the SDK:

v2.0 (proxy) / v0.2.0 (SDK)

v1.0 (proxy) / v0.1.0 (SDK)