LangSmart Smartflow · Python SDK v0.4.0

Python SDK
Reference

Complete reference for the Smartflow Python SDK — async client, chat, completions, embeddings, VAS audit logs, routing, compliance, MCP tools, and A2A agents.

Python 3.9+ Async / Sync OpenAI-Compatible SDK v0.4.0

Smartflow SDK for Python

Version 0.4.0 · pip install smartflow-sdk · Python 3.9+


What is Smartflow?

Smartflow is an enterprise AI orchestration layer that sits between your application and AI providers (OpenAI, Anthropic, Google, and others). It provides:

┌─────────────────────────────────────────────────────────────────┐
│                      YOUR APPLICATION                            │
│                             │                                    │
│                    pip install smartflow-sdk                     │
│                             │                                    │
│                    ┌────────▼────────┐                          │
│                    │  Smartflow SDK  │                          │
│                    └────────┬────────┘                          │
└─────────────────────────────┼───────────────────────────────────┘
                              │
                   ┌──────────▼──────────┐
                   │   SMARTFLOW PROXY   │
                   │  ┌────────────────┐ │
                   │  │ MetaCache      │ │  ← 60-80% cost savings
                   │  │ ML Compliance  │ │  ← Adaptive PII detection
                   │  │ Smart Routing  │ │  ← Best provider selection
                   │  │ VAS Logging    │ │  ← Complete audit trail
                   │  │ MCP Gateway    │ │  ← Tool orchestration
                   │  │ A2A Gateway    │ │  ← Agent-to-agent tasks
                   │  └────────────────┘ │
                   └──────────┬──────────┘
                              │
          ┌───────────┬───────┴───────┬───────────┬───────────┐
          ▼           ▼               ▼           ▼           ▼
      ┌───────┐  ┌─────────┐    ┌────────┐   ┌────────┐  ┌────────┐
      │OpenAI │  │Anthropic│    │ Gemini │   │ Cohere │  │ Local  │
      └───────┘  └─────────┘    └────────┘   └────────┘  └────────┘

Dual-Mode Operation — v0.4.0

New in v0.4.0
The SDK now works with or without a Smartflow gateway. Start with direct provider access and add a gateway later — zero code changes required.

Smartflow SDK v0.4.0 introduces a dual-mode architecture that gives developers the freedom to start building immediately — even without a deployed Smartflow instance — and seamlessly upgrade to full enterprise gateway mode whenever they are ready.

🏢 Gateway Mode Enterprise

Connect to a deployed Smartflow instance for the full feature set.

  • BERT KNN semantic cache (55–75% cost savings)
  • Real-time policy engine & PII detection
  • SSO identity (Entra ID, LDAP, SAML, OIDC)
  • Full VAS per-user audit trail
  • MCP gateway + A2A orchestration
  • Prometheus metrics + management dashboard
⚡ Direct Mode No gateway needed

Call AI providers directly — same API surface, no infrastructure required.

  • OpenAI, Anthropic, Gemini, Ollama
  • Multi-provider routing via model prefix
  • Streaming responses
  • Embeddings API
  • ~ Basic in-memory stats (no semantic cache)
  • ~ Gateway-only features raise DirectModeError

Mode Selection Logic

Mode is selected automatically in priority order — no manual configuration required:

# Priority 1: Explicit URL argument (backward compatible — always gateway mode)
sf = SmartflowClient("https://yourco.langsmart.app", api_key="sk-sf-...")

# Priority 2: SMARTFLOW_GATEWAY_URL environment variable
# export SMARTFLOW_GATEWAY_URL="https://yourco.langsmart.app"
sf = SmartflowClient()  # detects env var → gateway mode

# Priority 3: ~/.smartflow/config.yaml  (written by `smartflow configure`)
sf = SmartflowClient()  # reads config file → gateway or direct mode

# Priority 4: No gateway configured → direct mode (calls providers directly)
sf = SmartflowClient()  # direct mode with env var keys (OPENAI_API_KEY, etc.)

# Check which mode you're in
print(sf.mode)              # "gateway" or "direct"
print(sf.is_gateway_mode()) # True / False

First-Run Setup

Run the interactive wizard once to configure your environment. It saves settings to ~/.smartflow/config.yaml and is read automatically on every subsequent SmartflowClient() instantiation.

# CLI wizard (recommended — asks gateway URL or provider keys)
smartflow configure

# Check current config and test connectivity
smartflow status

# Quick chat test
smartflow chat "Hello, which mode am I using?"

Or trigger the wizard from Python:

import smartflow
smartflow.configure()  # same interactive wizard

Provider Routing (Direct Mode)

In direct mode, the model string controls which provider is called. The same prefix notation works in gateway mode too (the gateway translates it).

sf = SmartflowClient()  # or SmartflowClient("https://gateway...")

# OpenAI (default)
await sf.chat("Hello", model="gpt-4o")
await sf.chat("Hello", model="openai/gpt-4o")  # explicit prefix

# Anthropic Claude
await sf.chat("Hello", model="claude-sonnet-4-6")
await sf.chat("Hello", model="anthropic/claude-3-5-haiku-20241022")

# Google Gemini (via openai-compat endpoint)
await sf.chat("Hello", model="gemini-1.5-pro")
await sf.chat("Hello", model="gemini/gemini-2.0-flash")

# Ollama (local — OLLAMA_BASE_URL env var or default http://localhost:11434)
await sf.chat("Hello", model="ollama/llama3")
await sf.chat("Hello", model="ollama/mistral")

# Custom OpenAI-compat server (LOCAL_BASE_URL env var)
await sf.chat("Hello", model="local/my-fine-tuned-model")
Gateway pass-through
Provider prefix notation (anthropic/claude-*, gemini/..., ollama/...) also works in gateway mode — the Smartflow proxy translates and routes automatically. The same code works in both modes.

Gateway-Only Features in Direct Mode

Calling a gateway-only method (compliance scan, VAS logs, routing override, etc.) in direct mode raises a clear DirectModeError with instructions:

from smartflow import SmartflowClient
from smartflow.direct import DirectModeError

sf = SmartflowClient()  # direct mode

try:
    logs = await sf.get_logs()
except DirectModeError as e:
    print(e)
    # VAS audit logs requires a Smartflow Enterprise gateway.
    # Run `smartflow configure` to connect to a gateway and unlock:
    #   • BERT KNN semantic cache  (55–75% cost savings)
    #   • Real-time policy engine  (PII detection, jailbreak guard)
    #   • SSO identity integration (Entra ID, LDAP, SAML)
    #   • Full VAS audit trail     (per-user request logging)
    #   • MCP gateway + A2A orchestration

Installation

pip install smartflow-sdk

From source:

pip install git+https://github.com/SRAGroupTX/SmartflowV3.git#subdirectory=sdk/python

Optional — sync client in async environments (Jupyter notebooks):

pip install nest_asyncio

Quick Start

import asyncio
from smartflow import SmartflowClient

async def main():
    async with SmartflowClient("https://yourco.langsmart.app", api_key="sk-sf-...") as sf:  # gateway mode
        # Automatic caching, compliance scanning, multi-provider failover,
        # and complete audit logging on every call.
        response = await sf.chat("Explain quantum computing in simple terms")
        print(response)

asyncio.run(main())

Synchronous — Scripts and Notebooks

from smartflow import SyncSmartflowClient

sf = SyncSmartflowClient()  # reads ~/.smartflow/config.yaml (gateway or direct)

response = sf.chat("What is machine learning?")
print(response)

stats = sf.get_cache_stats()
print(f"Cache hit rate: {stats.hit_rate:.1%}")
print(f"Tokens saved: {stats.tokens_saved:,}")

sf.close()

Optional — Direct Mode Provider Support

To use direct mode (no gateway), install the provider packages you need:

# OpenAI + Anthropic (most common)
pip install "smartflow-sdk[all]"

# Just OpenAI
pip install "smartflow-sdk[openai]"

# Just Anthropic
pip install "smartflow-sdk[anthropic]"

# Sync client in Jupyter notebooks
pip install "smartflow-sdk[nest]"

In gateway mode, no provider packages are required — the gateway handles provider calls server-side. httpx is the only dependency.


For Jupyter notebooks with an existing event loop:

import nest_asyncio
nest_asyncio.apply()

OpenAI Drop-in Replacement

Zero code changes required — just update the base URL:

from openai import OpenAI

# Before: client = OpenAI()
# After: through Smartflow — caching, compliance, logging all apply transparently

client = OpenAI(
    base_url="http://your-smartflow:7775/v1",
    api_key="sk-sf-your-virtual-key"
)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}]
)

Feature Availability by Mode

Feature Gateway Mode Direct Mode
chat() / chat_completions()✓ Full✓ Full
stream_chat()✓ Full✓ Full
embeddings()✓ Full✓ OpenAI only
claude_message()✓ Via proxy✓ Direct Anthropic
Multi-provider routing✓ 37+ providers✓ OpenAI / Anthropic / Gemini / Ollama
Semantic BERT cache (55–75%)✓ 4-phase MetaCache✗ Not available
check_compliance() / intelligent_scan()✓ ML-powered✗ Gateway only
get_logs() (VAS audit trail)✓ Per-user SSO✗ Gateway only
get_cache_stats()✓ Live MetaCache stats✓ Client-side counts
SSO / Enterprise Identity✓ Entra ID / LDAP / SAML✗ Gateway only
MCP Gateway / A2A Orchestration✓ Full✗ Gateway only
Prometheus metrics✓ Native /metrics✗ Gateway only
Policy engine / guardrails✓ Visual editor + no-code✗ Gateway only
Required dependencieshttpx onlyopenai / anthropic (optional)

SmartflowClient

Primary async client.

class SmartflowClient(
    base_url: Optional[str] = None, # Gateway URL or None (direct/config mode)
    api_key: Optional[str] = None,  # Virtual key (sk-sf-...)
    timeout: float = 30.0,          # Request timeout in seconds
    management_port: int = 7778,    # Management API port
    compliance_port: int = 7777,    # Compliance API port
    bridge_port: int = 3500,        # Hybrid bridge port
)

Use as a context manager for automatic cleanup:

async with SmartflowClient("http://smartflow:7775", api_key="sk-sf-...") as sf:
    ...

# Or manual lifecycle
sf = SmartflowClient("http://smartflow:7775")
await sf._ensure_client()
# ... use sf ...
await sf.close()

Core AI Methods

chat()

Send a message, receive the reply as a plain string.

async def chat(
    message: str,
    model: str = "gpt-4o",
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    **kwargs,
) -> str
async with SmartflowClient("http://smartflow:7775") as sf:

    # Simple
    response = await sf.chat("Explain Docker containers")

    # With options
    response = await sf.chat(
        message="Write a Python function to sort a list",
        model="gpt-4o",
        system_prompt="You are an expert Python developer. Write clean, documented code.",
        temperature=0.3,
        max_tokens=1000,
    )

chat_completions()

Full OpenAI-compatible completions. Returns a structured AIResponse.

async def chat_completions(
    messages: List[Dict[str, str]],
    model: str = "gpt-4o",
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    stream: bool = False,
    **kwargs,
) -> AIResponse
response = await sf.chat_completions(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is REST API?"},
    ],
    model="gpt-4o",
)

print(response.content)
print(f"Tokens used: {response.usage.total_tokens}")
print(f"Cached: {response.cached}")  # True if served from MetaCache

stream_chat()

Async generator that yields text delta strings as they stream.

async def stream_chat(
    message: str,
    model: str = "gpt-4o",
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: Optional[int] = None,
    **kwargs,
) -> AsyncIterator[str]
async for chunk in sf.stream_chat("Tell me a story about a robot"):
    print(chunk, end="", flush=True)
print()

claude_message()

Send a message to Claude using the Anthropic Messages API native path. The proxy injects the API key automatically — no anthropic_key required in production.

async def claude_message(
    message: str,
    model: str = "claude-sonnet-4-6",
    max_tokens: int = 1024,
    system: Optional[str] = None,
    anthropic_key: Optional[str] = None,
) -> str
response = await sf.claude_message(
    message="Analyze this code for security vulnerabilities",
    model="claude-sonnet-4-6",
    max_tokens=2000,
    system="You are a senior security engineer.",
)

Routes to /anthropic/v1/messages (native Anthropic format). For multi-turn or multimodal use, call chat_completions() with model="claude-sonnet-4-6" using the OpenAI-compatible format.


embeddings()

Generate vector embeddings.

async def embeddings(
    input: Union[str, List[str]],
    model: str = "text-embedding-3-small",
    encoding_format: str = "float",
    dimensions: Optional[int] = None,
    input_type: Optional[str] = None,
    **kwargs,
) -> Dict[str, Any]
# Single text
result = await sf.embeddings("Hello, world!")
vector = result["data"][0]["embedding"]

# Batch
result = await sf.embeddings([
    "First document",
    "Second document",
    "Third document",
])
vectors = [item["embedding"] for item in result["data"]]

# Cohere with input_type
result = await sf.embeddings(
    ["search query", "document text"],
    model="cohere/embed-english-v3.0",
    input_type="search_document",
)

# Reduce dimensions (OpenAI text-embedding-3+)
result = await sf.embeddings("Hello", model="text-embedding-3-large", dimensions=256)

image_generation()

Generate images.

async def image_generation(
    prompt: str,
    model: str = "dall-e-3",
    n: int = 1,
    size: str = "1024x1024",
    quality: Optional[str] = None,
    response_format: str = "url",
    style: Optional[str] = None,
    **kwargs,
) -> Dict[str, Any]
result = await sf.image_generation(
    "A futuristic city at sunrise",
    model="dall-e-3",
    size="1792x1024",
    quality="hd",
    style="vivid",
)
print(result["data"][0]["url"])

audio_transcription()

Transcribe audio. Accepts a file-like object.

async def audio_transcription(
    file: Any,
    model: str = "whisper-1",
    language: Optional[str] = None,
    prompt: Optional[str] = None,
    response_format: str = "json",
    temperature: float = 0.0,
    filename: str = "audio.mp3",
    **kwargs,
) -> Dict[str, Any]
with open("recording.mp3", "rb") as f:
    result = await sf.audio_transcription(f, model="whisper-1")
print(result["text"])

# Groq (faster, free tier available)
with open("recording.mp3", "rb") as f:
    result = await sf.audio_transcription(f, model="groq/whisper-large-v3")

text_to_speech()

Synthesize speech. Returns raw audio bytes.

async def text_to_speech(
    input: str,
    model: str = "tts-1",
    voice: str = "alloy",
    response_format: str = "mp3",
    speed: float = 1.0,
    **kwargs,
) -> bytes
audio = await sf.text_to_speech("Hello, this is Smartflow.", voice="nova")
with open("output.mp3", "wb") as f:
    f.write(audio)

rerank()

Rerank documents by relevance to a query.

async def rerank(
    query: str,
    documents: List[str],
    model: str = "rerank-english-v3.0",
    top_n: Optional[int] = None,
    **kwargs,
) -> Dict[str, Any]
result = await sf.rerank(
    "What is the return policy?",
    ["We accept returns within 30 days.", "Contact support@example.com."],
    top_n=1,
)

list_models()

List available models across all enabled providers.

async def list_models() -> List[Dict[str, Any]]
models = await sf.list_models()
for m in models:
    print(m["id"])

chatbot_query()

Query Smartflow’s built-in system chatbot for operational information. Answers natural-language questions about VAS logs, cache stats, cost analysis, and system health.

async def chatbot_query(query: str) -> Dict[str, Any]
result = await sf.chatbot_query("show me today's cache stats")
print(result["response"])

result = await sf.chatbot_query("which provider had the most errors this week?")
result = await sf.chatbot_query("what did we spend on OpenAI yesterday?")

Provider Prefix Reference

All methods that accept a model parameter support provider prefix routing. Prefix the model name with provider/ to route to a specific provider. For the primary providers, no prefix is needed — model name is detected automatically.

Automatic detection (no prefix needed):

# OpenAI — detected from gpt-*, o1-*, o3-*, chatgpt-*, whisper-*, dall-e-*
reply = await sf.chat("Hello", model="gpt-4o")
reply = await sf.chat("Hello", model="gpt-4o-mini")
reply = await sf.chat("Hello", model="o3-mini")

# Anthropic — detected from claude-*
reply = await sf.chat("Hello", model="claude-sonnet-4-6")
reply = await sf.chat("Hello", model="claude-3-opus-20240229")

# Google Gemini — detected from gemini-*
reply = await sf.chat("Hello", model="gemini-1.5-pro")
reply = await sf.chat("Hello", model="gemini-2.0-flash")

Explicit prefix required:

reply = await sf.chat("Hello", model="xai/grok-2-latest")
reply = await sf.chat("Hello", model="mistral/mistral-large-latest")
reply = await sf.chat("Hello", model="cohere/command-r-plus")
reply = await sf.chat("Hello", model="groq/llama-3.1-70b-versatile")
reply = await sf.chat("Hello", model="openrouter/meta-llama/llama-3.1-405b")
reply = await sf.chat("Hello", model="ollama/llama3.2")
reply = await sf.chat("Hello", model="azure/my-gpt4o-deployment")

# Force native Anthropic Messages API path
reply = await sf.claude_message("Hello", model="claude-sonnet-4-6")

Full prefix table:

Prefix Provider API Key Env Var
(none) OpenAI OPENAI_API_KEY
anthropic/ Anthropic ANTHROPIC_API_KEY
xai/ xAI (Grok) XAI_API_KEY
gemini/ Google Gemini GEMINI_API_KEY
vertex_ai/ Google Vertex AI VERTEXAI_API_KEY
openrouter/ OpenRouter OPENROUTER_API_KEY
azure/ Azure OpenAI AZURE_API_KEY + AZURE_API_BASE
mistral/ Mistral AI MISTRAL_API_KEY
cohere/ Cohere COHERE_API_KEY
nvidia_nim/ NVIDIA NIM NVIDIA_NIM_API_KEY
huggingface/ HuggingFace HUGGINGFACE_API_KEY
groq/ Groq GROQ_API_KEY
deepgram/ Deepgram DEEPGRAM_API_KEY
fireworks/ Fireworks AI FIREWORKS_API_KEY
novita/ Novita AI NOVITA_API_KEY
together/ Together AI TOGETHER_API_KEY
perplexity/ Perplexity AI PERPLEXITY_API_KEY
replicate/ Replicate REPLICATE_API_KEY
vercel_ai_gateway/ Vercel AI Gateway VERCEL_AI_GATEWAY_API_KEY
ollama/ Ollama (local) (none required)

Intelligent Compliance Engine

Smartflow’s ML-powered compliance engine goes beyond regex. It learns and adapts based on user behavior and organizational baselines.

┌─────────────────────────────────────────────────────────────────┐
│                    INTELLIGENT COMPLIANCE                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐        │
│  │   LAYER 1    │   │   LAYER 2    │   │   LAYER 3    │        │
│  │  Regex/Rules │ → │ ML Embeddings│ → │  Behavioral  │        │
│  │              │   │              │   │   Analysis   │        │
│  │  SSN         │   │  Semantic    │   │  User        │        │
│  │  Credit Card │   │  similarity  │   │  patterns    │        │
│  │  Email       │   │  Context     │   │  Org         │        │
│  │  Phone       │   │  awareness   │   │  baselines   │        │
│  │  MRN         │   │  Learned     │   │  Anomaly     │        │
│  │  Passport    │   │  patterns    │   │  detection   │        │
│  └──────────────┘   └──────────────┘   └──────────────┘        │
│           │                 │                  │                │
│           └─────────────────┼──────────────────┘                │
│                             ▼                                   │
│                  ┌──────────────────┐                          │
│                  │ CORRELATION      │                          │
│                  │ ENGINE           │                          │
│                  │                  │                          │
│                  │ Composite Risk   │                          │
│                  │ Score + Action   │                          │
│                  └──────────────────┘                          │
│                             │                                   │
│                             ▼                                   │
│            Allow | AllowAndLog | Review | Block                │
└─────────────────────────────────────────────────────────────────┘

intelligent_scan()

async def intelligent_scan(
    content: str,
    user_id: Optional[str] = None,
    org_id: Optional[str] = None,
    context: Optional[str] = None,
) -> IntelligentScanResult
result = await sf.intelligent_scan(
    content="Please send payment to card 4111-1111-1111-1111",
)

print(f"Has violations: {result.has_violations}")      # True
print(f"Risk score: {result.risk_score:.2f}")          # 0.0 – 1.0
print(f"Action: {result.recommended_action}")          # Allow/AllowAndLog/Block/Review
print(f"Explanation: {result.explanation}")

for v in result.regex_violations:
    print(f"  - {v['violation_type']}: {v['severity']}")

Enable behavioral analysis with user context:

result = await sf.intelligent_scan(
    content="Customer email: john.doe@example.com",
    user_id="support_agent_42",   # Track individual behavior
    org_id="acme_corporation",    # Compare against org baseline
    context="customer_support",   # Context for better detection
)

check_compliance()

Rule-based compliance scan.

async def check_compliance(
    content: str,
    policy: str = "enterprise_standard",
) -> ComplianceResult
result = await sf.check_compliance("My SSN is 123-45-6789")
if result.has_violations:
    print(f"Risk: {result.risk_level}")
    print(f"PII: {result.pii_detected}")
    print(f"Safe text: {result.redacted_content}")

redact_pii()

async def redact_pii(content: str) -> str
safe = await sf.redact_pii("Call me at 555-867-5309, email john@example.com")
# "Call me at [PHONE], email [EMAIL]"

submit_compliance_feedback()

Submit a true/false-positive correction to retrain the ML model.

async def submit_compliance_feedback(
    scan_id: str,
    is_false_positive: bool,
    user_id: Optional[str] = None,
    notes: Optional[str] = None,
) -> Dict[str, Any]
# Scan content, store the scan response dict to get the scan_id
response = await sf._post(
    f"{sf.compliance_url}/api/compliance/intelligent/scan",
    {"content": "Call me at 555-0100"}
)
scan_id = response.get("scan_id")

if scan_id:
    await sf.submit_compliance_feedback(
        scan_id=scan_id,
        is_false_positive=True,
        user_id="admin_user",
        notes="555-0100 is a known test number, not real PII",
    )

get_learning_summary()

Organization-wide learning progress.

async def get_learning_summary() -> LearningSummary
summary = await sf.get_learning_summary()
print(f"Total users tracked: {summary.total_users}")
print(f"Users with complete baselines: {summary.users_learning_complete}")
print(f"Learning period: {summary.config_learning_days} days")

get_learning_status()

Adaptive learning status for a specific user.

async def get_learning_status(user_id: str) -> LearningStatus
status = await sf.get_learning_status("user-alice")
print(f"Days tracked: {status.days_tracked}")
print(f"Progress: {status.progress_percent}%")
print(f"Complete: {status.learning_complete}")

get_ml_stats()

ML compliance engine statistics.

async def get_ml_stats() -> MLStats
ml_stats = await sf.get_ml_stats()
print(f"Total patterns: {ml_stats.total_patterns}")
print(f"Learned patterns: {ml_stats.learned_patterns}")
print(f"Pattern categories: {ml_stats.patterns_by_category}")
print(f"Average confidence: {ml_stats.average_confidence:.2f}")

get_org_baseline()

Organization behavioral baseline used for anomaly detection.

async def get_org_baseline(org_id: str) -> OrgBaseline
baseline = await sf.get_org_baseline("acme-corp")
print(f"Users: {baseline.user_count}")
print(f"Violation rate: {baseline.violation_rate:.2%}")
print(f"Top violations: {baseline.top_violation_types}")

Other Compliance Methods

Method Returns Description
get_org_summary() Dict Organization-level aggregate compliance stats
get_persistence_stats() PersistenceStats Redis persistence stats for compliance data
save_compliance_data() Dict Trigger manual flush of compliance data to Redis
get_intelligent_health() Dict Health status of ML engine and embedding service

MCP Tool Invocation

The SDK does not expose dedicated MCP wrapper methods. MCP tool calls are direct HTTP requests to the proxy. Use httpx or any HTTP client.

Calling a Tool

import httpx

async with httpx.AsyncClient() as client:
    response = await client.post(
        "http://smartflow:7775/github-tools/mcp/",
        headers={
            "Authorization": "Bearer sk-sf-...",
            "Content-Type": "application/json",
        },
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/call",
            "params": {
                "name": "create_issue",
                "arguments": {
                    "repo": "my-org/my-repo",
                    "title": "Bug: login fails on mobile",
                    "body": "Steps to reproduce...",
                }
            }
        }
    )
    result = response.json()
    print(result["result"]["content"])

Searching the Tool Catalog

async with httpx.AsyncClient() as client:
    r = await client.get(
        "http://smartflow:7775/api/mcp/tools/search",
        params={"q": "create github issue", "k": 3},
        headers={"Authorization": "Bearer sk-sf-..."},
    )
    for tool in r.json()["results"]:
        print(f"{tool['server_id']}.{tool['name']}: {tool['description']}")

MCP Usage and Cost

async with httpx.AsyncClient() as client:
    r = await client.get(
        "http://smartflow:7775/api/mcp/usage",
        headers={"Authorization": "Bearer sk-sf-..."},
    )
    # Per-server call counts and cumulative cost totals

A2A Agent Invocation

A2A (Agent-to-Agent) tasks are HTTP POST requests to the proxy. The proxy forwards to the registered agent, logs the exchange, and returns the result.

Sending a Task

import httpx

async with httpx.AsyncClient() as client:
    response = await client.post(
        "http://smartflow:7775/a2a/summarizer-agent",
        headers={
            "Authorization": "Bearer sk-sf-...",
            "Content-Type": "application/json",
            "x-a2a-trace-id": "trace-abc-123",  # correlate across agents
        },
        json={
            "id": "task-uuid-001",
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Summarise the Q4 earnings report."}]
            }
        }
    )
    result = response.json()
    print(result["result"]["parts"][0]["text"])

Agent Capability Card

async with httpx.AsyncClient() as client:
    r = await client.get(
        "http://smartflow:7775/a2a/summarizer-agent/.well-known/agent.json",
        headers={"Authorization": "Bearer sk-sf-..."},
    )
    card = r.json()
    print(card["name"], card["capabilities"])

The x-a2a-trace-id header is passed through all hops in a multi-agent chain so logs from every agent can be correlated by a single trace ID.


Monitoring and Analytics

Cache Performance

stats = await sf.get_cache_stats()

print("=== CACHE PERFORMANCE ===")
print(f"Hit Rate: {stats.hit_rate:.1%}")
print(f"")
print(f"Layer Breakdown:")
print(f"  L1 (Memory):   {stats.l1_hits:,} hits")
print(f"  L2 (Semantic): {stats.l2_hits:,} hits")
print(f"  L3 (Exact):    {stats.l3_hits:,} hits")
print(f"")
print(f"Savings:")
print(f"  Tokens saved:  {stats.tokens_saved:,}")
print(f"  Cost saved:    ${stats.cost_saved_cents / 100:.2f}")

CacheStats fields: hits, misses, hit_rate, tokens_saved, cost_saved_cents, l1_hits, l2_hits, l3_hits, entries

Provider Health

providers = await sf.get_provider_health()

print("=== PROVIDER STATUS ===")
for p in providers:
    print(f"{p.provider}")
    print(f"   Status:   {p.status}")
    print(f"   Latency:  {p.latency_ms:.0f}ms")
    print(f"   Success:  {p.success_rate:.1%}")
    print(f"   Requests: {p.requests_total:,}")

ProviderHealth fields: provider, status, latency_ms, success_rate, error_rate, requests_total, last_updated

System Health

health = await sf.health_comprehensive()
print(health.status)          # "healthy" | "degraded" | "unhealthy"
print(health.uptime_seconds)
print(health.version)

Quick liveness check:

status = await sf.health()
assert status["status"] == "ok"

Audit Logs (VAS)

async def get_logs(
    limit: int = 100,
    offset: int = 0,
    provider: Optional[str] = None,
    model: Optional[str] = None,
    days: int = 30,
) -> List[VASLog]

Retrieve VAS audit logs from the hot Redis tier (recent) and cold MongoDB tier (archived). Logs are returned newest-first.

logs = await sf.get_logs(limit=50, provider="openai", days=7)

print("=== RECENT AI INTERACTIONS ===")
for log in logs:
    route = log.routing_strategy or "direct"
    cached_badge = f"[CACHE:{log.metacache.tokens_saved} tokens saved]" if log.metacache.hit else ""
    print(f"[{log.timestamp}] {log.provider}/{log.model} {cached_badge}")
    print(f"  Latency: {log.latency_ms}ms | Tokens: {log.tokens_used} | Routing: {route}")
    print(f"  Compliance: {log.compliance_status} | Conv: {log.conversation_id}")
    if log.compliance_violations:
        print(f"  Violations: {log.compliance_violations}")

Pagination example:

page_size = 100
page = 0
while True:
    batch = await sf.get_logs(limit=page_size, offset=page * page_size)
    if not batch:
        break
    process(batch)
    page += 1

Analytics

async def get_analytics(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
) -> Dict[str, Any]
data = await sf.get_analytics(
    start_date="2026-02-01",
    end_date="2026-02-19",
)

Routing

get_routing_status()

Current routing state: active provider, fallback chain, last failure.

status = await sf.get_routing_status()

force_provider()

Force all routing to a specific provider for a duration.

async def force_provider(
    provider: str,
    duration_seconds: int = 300,
) -> Dict[str, Any]
# Force to OpenAI for 10 minutes during an Anthropic outage
await sf.force_provider("openai", duration_seconds=600)

SmartflowAgent

Higher-level agent with conversation memory, compliance scanning, and tool support.

from smartflow import SmartflowClient, SmartflowAgent

async with SmartflowClient("http://smartflow:7775") as sf:

    agent = SmartflowAgent(
        client=sf,
        name="TechSupport",
        model="gpt-4o",
        system_prompt="""You are a senior technical support engineer.
Guidelines:
- Be patient and thorough
- Ask clarifying questions when needed
- Provide step-by-step solutions
- Never ask for or repeat sensitive information""",
        temperature=0.7,
        compliance_policy="enterprise_standard",
        enable_compliance_scan=True,   # Auto-scan inputs and outputs
        user_id="support_session_123",
        org_id="tech_company",
    )

    # Conversation with full context memory
    print(await agent.chat("My application keeps crashing"))
    print(await agent.chat("It's a Python web app using Flask"))
    print(await agent.chat("Here's the error: MemoryError"))

    print(f"Messages exchanged: {agent.message_count}")
    agent.clear_history()
Method Description
chat(message, scan_input=True, scan_output=True) Send message; raises ComplianceError if blocked
clear_history() Reset conversation, preserve system prompt
get_history() Return copy of message history
message_count Number of messages in history

SmartflowWorkflow

Chain AI operations with branching logic.

from smartflow import SmartflowClient, SmartflowWorkflow

async with SmartflowClient("http://smartflow:7775") as sf:

    workflow = SmartflowWorkflow(sf, name="ContentPipeline")

    workflow.add_step(
        name="analyze",
        action="chat",
        config={
            "prompt": "Analyze the tone and intent of this text: {input}",
            "model": "gpt-4o-mini",
        },
        next_steps=["compliance_check"],
    )

    workflow.add_step(
        name="compliance_check",
        action="compliance_check",
        config={"content": "{input}"},
        next_steps=["route"],
    )

    workflow.add_step(
        name="route",
        action="condition",
        config={
            "field": "output",
            "cases": {
                "positive": "enhance",
                "negative": "review",
                "neutral": "publish",
            },
        },
    )

    result = await workflow.execute({"input": "This product exceeded my expectations!"})

    print(f"Success: {result.success}")
    print(f"Path taken: {' -> '.join(result.steps_executed)}")
    print(f"Execution time: {result.execution_time_ms:.0f}ms")
    print(f"Total tokens: {result.total_tokens}")

Step actions:

Action Config fields Description
"chat" prompt, model, temperature Chat completion; {input} / {output} are template variables
"compliance_check" content Rule-based compliance scan
"condition" field, cases, default Branch on a context value

SyncSmartflowClient

Synchronous wrapper. Every async method is available without await.

from smartflow import SyncSmartflowClient

sf = SyncSmartflowClient("http://smartflow:7775", api_key="sk-sf-...")

reply      = sf.chat("Hello!")
emb        = sf.embeddings("Hello", model="text-embedding-3-small")
img        = sf.image_generation("A sunset", model="dall-e-3")
transcript = sf.audio_transcription(open("audio.mp3", "rb"), model="whisper-1")
audio      = sf.text_to_speech("Hello!", voice="nova")
ranked     = sf.rerank("What is the return policy?", ["doc1", "doc2"])
stats      = sf.get_cache_stats()
logs       = sf.get_logs(limit=20)

sf.close()

Configuration Reference

Client Options

sf = SmartflowClient(
    base_url="http://smartflow:7775",   # Proxy endpoint
    api_key="sk-sf-...",                # Virtual key for authentication
    timeout=30.0,                       # Request timeout in seconds
    management_port=7778,               # Health, metrics, routing API
    compliance_port=7777,               # Compliance API
    bridge_port=3500,                   # Hybrid bridge (cross-instance logs)
)

From Environment Variables

import os
from smartflow import SmartflowClient

sf = SmartflowClient(
    base_url=os.environ["SMARTFLOW_URL"],
    api_key=os.environ.get("SMARTFLOW_API_KEY"),
)

Error Handling

from smartflow import (
    SmartflowClient,
    SmartflowError,
    ConnectionError,
    ComplianceError,
    RateLimitError,
    TimeoutError,
)
import asyncio

try:
    async with SmartflowClient("http://smartflow:7775") as sf:
        response = await sf.chat("Hello!")

except ConnectionError:
    print("Cannot connect to Smartflow proxy")

except ComplianceError as e:
    print(f"Blocked by compliance policy: {e}")

except RateLimitError:
    print("Rate limited — backing off")
    await asyncio.sleep(60)

except TimeoutError:
    print("Request timed out")

except SmartflowError as e:
    print(f"Smartflow error: {e}")
Exception Condition
SmartflowError Base class for all SDK errors
ConnectionError Cannot connect to proxy
AuthenticationError 401 — invalid or missing key
RateLimitError 429 — rate limit hit
ComplianceError 403 — request blocked by compliance policy
ProviderError Upstream provider error
TimeoutError Request timeout

Real-World Use Cases

Use Case 1: Secure Customer Support Bot

Challenge: Build a customer support chatbot that handles sensitive information while maintaining PCI-DSS and GDPR compliance.

import asyncio
from smartflow import SmartflowClient, SmartflowAgent

class SecureCustomerSupportBot:
    """
    Customer support bot with built-in PII protection.

    - Automatic PII detection and blocking
    - Conversation memory
    - Audit trail for compliance
    - Behavioral analysis per customer
    """

    def __init__(self, smartflow_url: str):
        self.sf_url = smartflow_url

    async def handle_customer_session(self, customer_id: str, organization: str):
        async with SmartflowClient(self.sf_url) as sf:
            agent = SmartflowAgent(
                client=sf,
                name="SecureSupport",
                model="gpt-4o",
                system_prompt="""You are a helpful customer support agent for a financial services company.

CRITICAL RULES:
1. NEVER ask customers for full credit card numbers, SSNs, or passwords
2. If a customer shares sensitive info, acknowledge receipt but do not repeat it
3. For account verification, use last 4 digits only
4. Always offer secure channels for sensitive transactions""",
                compliance_policy="pci_dss_strict",
                enable_compliance_scan=True,
                user_id=f"customer_{customer_id}",
                org_id=organization,
            )

            print("SecureSupport: Hello! How can I help you today?")

            while True:
                user_input = input("Customer: ")
                if user_input.lower() == "quit":
                    break

                try:
                    response = await agent.chat(user_input)
                    print(f"SecureSupport: {response}")
                except Exception as e:
                    if "compliance" in str(e).lower():
                        print("SecureSupport: I noticed you shared some sensitive information.")
                        print("For your protection, please use our secure verification process.")
                    else:
                        print(f"Error: {e}")

async def main():
    bot = SecureCustomerSupportBot("http://smartflow:7775")
    await bot.handle_customer_session(customer_id="12345", organization="fintech_corp")

# asyncio.run(main())

What This Demonstrates: - PII detection blocks sensitive data before it reaches the AI provider - Behavioral tracking learns normal patterns per customer - Complete audit trail for compliance audits - Graceful handling of compliance violations


Use Case 2: Cost-Optimized Content Generation Pipeline

Challenge: Generate thousands of product descriptions daily while minimizing API costs.

import asyncio
from dataclasses import dataclass
from typing import List
from smartflow import SmartflowClient

@dataclass
class Product:
    id: str
    name: str
    category: str
    features: List[str]
    price: float

class ContentGenerationPipeline:
    """
    High-volume content generation with intelligent caching.

    Cost optimization:
    1. Similar products hit the semantic cache (60-80% savings)
    2. Smaller models for simple tasks
    3. Structured prompts to maximize cache hit potential
    """

    def __init__(self, smartflow_url: str):
        self.sf_url = smartflow_url

    async def generate_description(self, sf: SmartflowClient, product: Product) -> dict:
        # Structure prompt to maximize cache hits across similar products
        prompt = f"""Write a compelling product description.

Category: {product.category}
Product: {product.name}
Key Features: {', '.join(product.features)}
Price Point: ${product.price:.2f}

Requirements:
- 2-3 sentences
- Highlight key benefits
- Include call-to-action
- Professional tone"""

        response = await sf.chat_completions(
            messages=[{"role": "user", "content": prompt}],
            model="gpt-4o-mini",
            temperature=0.7,
        )

        return {
            "product_id": product.id,
            "description": response.content,
            "cached": response.cached,
            "tokens": response.usage.total_tokens,
        }

    async def process_catalog(self, products: List[Product]) -> dict:
        async with SmartflowClient(self.sf_url) as sf:
            initial_stats = await sf.get_cache_stats()
            results = []
            cached_count = 0
            total_tokens = 0

            for i, product in enumerate(products):
                result = await self.generate_description(sf, product)
                results.append(result)
                if result["cached"]:
                    cached_count += 1
                total_tokens += result["tokens"]

                if (i + 1) % 10 == 0:
                    print(f"Processed {i + 1}/{len(products)} products...")

            final_stats = await sf.get_cache_stats()
            tokens_saved = final_stats.tokens_saved - initial_stats.tokens_saved
            cost_saved = final_stats.cost_saved_cents - initial_stats.cost_saved_cents
            cache_hit_rate = cached_count / len(products) if products else 0

            return {
                "results": results,
                "summary": {
                    "total_products": len(products),
                    "cache_hit_rate": f"{cache_hit_rate:.1%}",
                    "tokens_used": total_tokens,
                    "tokens_saved": tokens_saved,
                    "cost_saved": f"${cost_saved / 100:.2f}",
                },
            }

async def main():
    pipeline = ContentGenerationPipeline("http://smartflow:7775")

    products = [
        Product("SKU001", "Wireless Bluetooth Headphones", "Electronics",
                ["Noise cancelling", "40hr battery", "Premium sound"], 149.99),
        Product("SKU002", "Wireless Earbuds Pro", "Electronics",
                ["Active noise cancelling", "36hr battery", "Hi-Fi audio"], 129.99),
        Product("SKU003", "Over-Ear Gaming Headset", "Electronics",
                ["7.1 surround", "Noise isolation", "RGB lighting"], 89.99),
    ]

    result = await pipeline.process_catalog(products)

    print(f"Products processed: {result['summary']['total_products']}")
    print(f"Cache hit rate:     {result['summary']['cache_hit_rate']}")
    print(f"Tokens saved:       {result['summary']['tokens_saved']:,}")
    print(f"Cost saved:         {result['summary']['cost_saved']}")

# asyncio.run(main())

What This Demonstrates: - Semantic caching recognizes similar products and reuses responses - Structured prompts maximize cache hit potential - Real-time cost tracking via cost_saved_cents - Batch processing for high-volume workloads


Use Case 3: Multi-Agent Research and Report Generation

Challenge: Coordinate multiple specialized AI agents to produce a polished, auditable research report.

import asyncio
from datetime import datetime
from smartflow import SmartflowClient, SmartflowAgent

class ResearchOrchestrator:
    """
    Multi-agent research system.

    Agents:
    1. Researcher — gathers and summarizes information
    2. Analyst    — identifies patterns and insights
    3. Writer     — produces polished executive report
    4. Editor     — reviews for accuracy and clarity (deep mode only)

    All interactions logged for full auditability.
    """

    def __init__(self, smartflow_url: str):
        self.sf_url = smartflow_url

    async def research_topic(self, topic: str, depth: str = "standard") -> dict:
        async with SmartflowClient(self.sf_url) as sf:
            timestamp = datetime.now().isoformat()

            researcher = SmartflowAgent(
                client=sf, name="Researcher", model="gpt-4o",
                system_prompt="""You are a thorough research analyst.
Provide structured findings covering: current state, recent developments,
key players, and challenges. Output as organized bullet points.""",
                user_id="research_system", org_id="analytics_dept",
            )

            print(f"Researcher: Investigating '{topic}'...")
            research_data = await researcher.chat(
                f"Research this topic: {topic}\n\n"
                f"Cover: current state and facts, recent developments, "
                f"key players, challenges and opportunities."
            )

            if depth == "quick":
                return {"topic": topic, "timestamp": timestamp,
                        "report": research_data, "agents_used": ["Researcher"]}

            analyst = SmartflowAgent(
                client=sf, name="Analyst", model="gpt-4o",
                system_prompt="""You are a strategic analyst.
Identify non-obvious patterns, provide data-driven insights,
make predictions, and highlight risks and opportunities.""",
                user_id="research_system", org_id="analytics_dept",
            )

            print("Analyst: Analyzing findings...")
            analysis = await analyst.chat(
                f"Analyze this research and provide strategic insights:\n\n"
                f"{research_data}\n\nFocus on hidden patterns, future implications, "
                f"and strategic recommendations."
            )

            writer = SmartflowAgent(
                client=sf, name="Writer", model="gpt-4o",
                system_prompt="""You are an expert business writer.
Synthesize research and analysis into a coherent executive narrative.
Lead with key findings and recommendations.""",
                user_id="research_system", org_id="analytics_dept",
            )

            print("Writer: Composing report...")
            draft_report = await writer.chat(
                f"Write an executive report from this research and analysis:\n\n"
                f"RESEARCH:\n{research_data}\n\nANALYSIS:\n{analysis}\n\n"
                f"Include: Executive Summary, Key Findings, "
                f"Strategic Analysis, Recommendations, Conclusion."
            )

            if depth == "standard":
                return {"topic": topic, "timestamp": timestamp,
                        "report": draft_report,
                        "agents_used": ["Researcher", "Analyst", "Writer"]}

            # Deep mode: add Editor
            editor = SmartflowAgent(
                client=sf, name="Editor", model="gpt-4o",
                system_prompt="""You are a senior business editor.
Review for accuracy and clarity, improve flow, ensure consistent tone,
fact-check against source research, polish for executive presentation.""",
                temperature=0.3,
                user_id="research_system", org_id="analytics_dept",
            )

            print("Editor: Polishing final report...")
            final_report = await editor.chat(
                f"Edit and polish this report:\n\n{draft_report}\n\n"
                f"Source research for fact-checking:\n{research_data}"
            )

            logs = await sf.get_logs(limit=10)

            return {
                "topic": topic,
                "timestamp": timestamp,
                "report": final_report,
                "agents_used": ["Researcher", "Analyst", "Writer", "Editor"],
                "audit_trail": [
                    {"timestamp": log.timestamp, "model": log.model,
                     "tokens": log.tokens_used, "cached": log.cached}
                    for log in logs
                ],
            }

async def main():
    orchestrator = ResearchOrchestrator("http://smartflow:7775")

    result = await orchestrator.research_topic(
        topic="The impact of AI agents on enterprise software development in 2026",
        depth="deep",
    )

    print("=" * 60)
    print(f"Topic: {result['topic']}")
    print(f"Generated: {result['timestamp']}")
    print(f"Agents: {' -> '.join(result['agents_used'])}")
    print("=" * 60)
    print(result["report"])

    print("\nAUDIT TRAIL:")
    for entry in result.get("audit_trail", []):
        cached = "[CACHED]" if entry["cached"] else ""
        print(f"  [{entry['timestamp']}] {entry['model']}{entry['tokens']} tokens {cached}")

# asyncio.run(main())

What This Demonstrates: - Coordinated multi-agent workflows with specialized roles - Progressive refinement through an agent chain - Complete per-request audit trail for every AI interaction - Organizational context tracking for behavioral analysis


Response Types

AIResponse

Field Type Description
content str First choice text
choices list Full choices array
usage Usage Token usage (prompt_tokens, completion_tokens, total_tokens)
model str Model used
id str Response ID
cached bool True if served from MetaCache
cache_hit_type str "exact", "semantic" (Phase 4 VectorLite BERT KNN hit), or None
provider str Provider that served the request

CacheStats

Field Type
hit_rate float
hits / misses int
l1_hits / l2_hits / l3_hits int
tokens_saved int
cost_saved_cents int
entries int

ComplianceResult

Field Type
has_violations bool
compliance_score float
violations list[str]
pii_detected list[str]
risk_level str"low" / "medium" / "high" / "critical"
recommendations list[str]
redacted_content str \| None

IntelligentScanResult

Field Type
has_violations bool
risk_score float — 0.0 to 1.0
recommended_action str"Allow" / "AllowAndLog" / "Review" / "Block"
explanation str
regex_violations list
ml_violations list
behavior_deviations list
processing_time_us int

VASLog

Field Type Description
request_id str Unique ID — matches x-smartflow-request-id response header
timestamp str ISO-8601 UTC
provider str openai, anthropic, google, etc.
model str Actual model returned by provider (or requested model for cache hits)
model_provider str Provider name only — never contains API key material
tokens_used int Total tokens (prompt + completion)
cost float Estimated USD cost
latency_ms int End-to-end proxy latency. 5 for cache hits.
processing_time_ms int Total processing time
content_type str chat, completion, image, etc.
user_id str \| None Extracted from JWT, x-smartflow-user-id, or x-user-id header
conversation_id str \| None Set when x-conversation-id or x-session-id header is present
conversation_stage str \| None Conversation lifecycle stage
routing_strategy str \| None "direct", "cache", or configured strategy ("latency", "tag", etc.)
routing_reason str \| None Human-readable reason, e.g. "provider:openai" or "cache_hit:tier=L1"
compliance_status str \| None "compliant" or "violated"
compliance_violations str \| None JSON-encoded list of violation objects when compliance_status == "violated"
compliance ComplianceInfo Full compliance object including data_classification, compliance_score, violations_details, regulatory_frameworks
metacache MetacacheData Cache hit info: hit (bool), query (str), tokens_saved (int)
metrics ProviderMetrics Provider-level metrics: prompt_tokens, completion_tokens, processing_time_ms, success

ProviderHealth

Field Type
provider str
status str"healthy" / "degraded" / "unhealthy"
latency_ms float
success_rate float
error_rate float
requests_total int
last_updated str

SystemHealth

Field Type
status str
uptime_seconds int
version str
providers dict
cache dict
timestamp str

WorkflowResult

Field Type
success bool
output str
steps_executed list[str]
errors list
total_tokens int
total_cost_cents int
execution_time_ms float

Summary

Feature Benefit
Semantic Cache (3-tier) 60–80% cost reduction, no external vector DB
ML Compliance Engine Real-time PII protection with adaptive learning
Smart Routing Latency, cost, or priority-based provider selection
Full Audit Trail (VAS) Complete compliance visibility across every request
MCP Tool Gateway Register and invoke external tools with shared auth and budgeting
A2A Agent Orchestration Route tasks across agents with full traceability
Agent Builder Production-ready conversational AI with memory and compliance
Workflow Orchestration Multi-step AI pipelines with branching and error handling

Resources

Support


Changelog

v0.3.1 — 2026

v0.3.0

v0.2.0

v0.1.0


© 2026 Langsmart, Inc. All rights reserved. Smartflow is a trademark of Langsmart, Inc.