Preamble#
An agentic AI platform is only as robust as its client surface. The protocol stack—JSON-RPC at the application boundary, gRPC/Protobuf for internal execution, MCP for tool discovery—is inert without typed, ergonomic, transport-agnostic, fail-safe client libraries that expose those protocols to every runtime environment where agents must operate. This chapter defines the complete SDK architecture: from design philosophy and core abstractions through connection lifecycle, middleware chains, offline-first variants, code generation pipelines, testing harnesses, hostile-environment embedding, and telemetry emission. Every abstraction is specified with typed contracts, bounded resource budgets, and measurable quality gates. The objective is a universal agent client layer that is correct by construction, observable by default, and degradation-tolerant under arbitrary deployment conditions.
5.1 SDK Design Philosophy: Typed, Ergonomic, Transport-Agnostic, Fail-Safe#
5.1.1 Foundational Axioms#
The SDK is not a convenience wrapper around HTTP calls. It is a typed protocol compiler that translates platform semantics into language-native abstractions while preserving correctness invariants across transport, serialization, concurrency, and failure domains.
Axiom 1 — Type Safety as Correctness Enforcement. Every SDK entity—requests, responses, tool schemas, memory items, context payloads—must be represented as statically typed structures (or runtime-validated equivalents in dynamic languages). The type system serves as the first verification gate before any bytes cross a network boundary.
Axiom 2 — Ergonomic Minimalism. The API surface must satisfy the principle of least astonishment: a developer familiar with the domain should predict method signatures, error classes, and lifecycle semantics without consulting documentation. Overloads, optional parameters, and builder patterns are permitted only where they reduce cognitive overhead without introducing ambiguity.
Axiom 3 — Transport Agnosticism. The SDK must expose a single logical API that operates identically over JSON-RPC/HTTP, gRPC/H2, MCP stdio/SSE, WebSocket, and in-process function dispatch. Transport selection is a configuration concern, never an API concern.
Axiom 4 — Fail-Safe by Default. Every SDK operation must define explicit failure semantics: timeout, retry budget, circuit-breaker state, fallback behavior, and error classification. Silent failures are architectural defects.
5.1.2 Design Quality Function#
Define the SDK quality function as a multi-objective optimization:
where:
- : fraction of API paths with statically verified input/output contracts
- : inverse of average cognitive steps to complete a canonical task
- : fraction of features available identically across all supported transports
- : fraction of failure modes with explicit recovery semantics
- : total number of public types, methods, and configuration knobs (penalized to enforce minimalism)
- : weighting coefficients calibrated per organizational priority
5.1.3 Layered Architecture Model#
The SDK is structured as a strict layered stack with upward-only dependency flow:
| Layer | Responsibility | Coupling |
|---|---|---|
| L0 — Wire | Serialization, framing, TLS, compression | Transport-specific |
| L1 — Protocol | JSON-RPC envelope, gRPC stub, MCP session | Protocol-specific |
| L2 — Contract | Typed request/response models, error taxonomy | Transport-agnostic |
| L3 — Service | AgentClient, ToolRegistry, MemoryStore, ContextBuilder | Domain-specific |
| L4 — Middleware | Interceptors, logging, auth, retry, rate-limiting | Cross-cutting |
| L5 — Ergonomic | Builders, fluent APIs, language idioms, sugar methods | Language-specific |
Invariant: No layer may reference any type or function defined in where . Violations are detected by static analysis in CI.
5.1.4 Contract-First Development Model#
All SDK types are derived from a single source of truth: the platform's Protobuf and JSON Schema definitions. The derivation pipeline is:
Hand-authored code may compose generated types but may never redefine wire-level semantics. This separation ensures that schema evolution propagates deterministically through the entire SDK stack.
5.1.5 Failure Taxonomy#
Every SDK operation classifies errors into a canonical taxonomy:
| Error Class | Retryable | User-Actionable | Example |
|---|---|---|---|
TransientNetwork | Yes (with backoff) | No | TCP reset, DNS timeout |
RateLimited | Yes (with delay from header) | No | 429 response |
InvalidRequest | No | Yes | Schema validation failure |
AuthFailure | No | Yes | Expired token, missing scope |
ServerError | Yes (bounded) | No | 500, gRPC INTERNAL |
ResourceExhausted | Yes (with backpressure) | Possibly | Token budget exceeded |
DeadlineExceeded | No | Configurable | Operation timeout |
CircuitOpen | No (until reset) | No | Breaker tripped |
Unavailable | Yes (with jitter) | No | Service unreachable |
Each SDK must map transport-specific error signals (HTTP status codes, gRPC status codes, MCP error objects) into this canonical taxonomy before surfacing to the caller. This ensures uniform error handling regardless of transport.
5.2 Language-Specific SDK Design: Python, TypeScript/Node.js, Rust, Go, Java/Kotlin, C#/.NET, Swift#
5.2.1 Design Matrix#
Each language SDK must satisfy the universal contract while exploiting language-specific strengths. The following matrix captures key design decisions:
| Dimension | Python | TypeScript | Rust | Go | Java/Kotlin | C#/.NET | Swift |
|---|---|---|---|---|---|---|---|
| Type System | Runtime (Pydantic/dataclass) | Compile-time (TS interfaces) | Compile-time (algebraic) | Compile-time (struct) | Compile-time (sealed/data) | Compile-time (record) | Compile-time (struct/enum) |
| Async Model | asyncio coroutines | Promise/async-await | tokio futures | goroutines + channels | CompletableFuture / coroutines (Kotlin) | Task/ValueTask | async/await + Structured Concurrency |
| Error Model | Typed exceptions + Result wrapper | Discriminated union Result<T,E> | Result<T, SdkError> | (T, error) tuple | Checked exceptions / sealed Result | Result<T> / exceptions | throws + Result |
| Streaming | AsyncIterator | AsyncIterable / ReadableStream | Stream<Item=Result<T>> | <-chan T | Flow<T> (Kotlin) / Flux<T> (Reactor) | IAsyncEnumerable<T> | AsyncSequence |
| Cancellation | asyncio.Task.cancel() | AbortController | tokio::select! + CancellationToken | context.Context | CoroutineScope.cancel() | CancellationToken | Task cancellation |
| Package Distribution | PyPI (wheel + sdist) | npm (ESM + CJS) | crates.io | Go modules | Maven Central / Gradle | NuGet | Swift Package Manager |
| Min Supported | Python 3.10+ | Node 18+ / TS 5.0+ | Rust 1.70+ (MSRV) | Go 1.21+ | Java 17+ / Kotlin 1.9+ | .NET 8+ | Swift 5.9+ |
5.2.2 Language Idiom Compliance#
Each SDK must follow the canonical idioms of its target language:
- Python: Use
@dataclassor PydanticBaseModelfor contracts; expose both sync and async variants through a unified client factory; supportwith(context manager) for lifecycle; usetypingannotations throughout. - TypeScript: Export ESM-first; use discriminated unions for error types; support tree-shaking; expose
Zodor equivalent runtime schema validation for dynamic contexts. - Rust: Use algebraic types (
enum,struct) with#[derive(Serialize, Deserialize)]; exposeasynctrait methods; integrate withtowermiddleware; supportno_stdfor embedded/edge targets via feature flags. - Go: Use struct embedding, interface satisfaction, and
context.Contextpropagation; avoid generics unless they eliminate significant boilerplate; export errors as sentinel values anderrors.Is/Ascompatible types. - Java/Kotlin: Use sealed interfaces for result/error types; provide Kotlin coroutine-native extensions alongside Java
CompletableFutureAPI; support Spring Boot auto-configuration and Micronaut integration. - C#/.NET: Use
recordtypes for immutable contracts; integrate withIHttpClientFactory; supportIAsyncEnumerable<T>for streaming; provideMicrosoft.Extensions.DependencyInjectionregistration. - Swift: Use
Codablestructs; integrate withURLSessionandAsyncSequence; support SwiftUIObservableObjectwrappers for reactive UI binding.
5.2.3 Pseudo-Algorithm: Language-Agnostic Client Initialization#
ALGORITHM: InitializeAgentClient(config: SdkConfig) → AgentClient
1. VALIDATE config against SdkConfigSchema
IF validation fails → RAISE InvalidRequest("Configuration schema violation", details)
2. RESOLVE transport := SelectTransport(config.endpoint, config.transport_preference)
// Returns: GRPC | JSON_RPC | MCP_STDIO | MCP_SSE | WEBSOCKET | IN_PROCESS
3. CONSTRUCT wire_layer := WireLayer.create(transport, config.tls, config.compression)
4. CONSTRUCT protocol_layer := ProtocolLayer.create(transport, wire_layer)
5. CONSTRUCT connection_pool := ConnectionPool.create(
min_connections = config.pool.min_size,
max_connections = config.pool.max_size,
idle_timeout = config.pool.idle_timeout,
health_check_interval = config.pool.health_interval
)
6. CONSTRUCT middleware_chain := MiddlewareChain.build([
AuthInterceptor(config.auth),
RetryInterceptor(config.retry_policy),
RateLimitInterceptor(config.rate_limit),
CircuitBreakerInterceptor(config.circuit_breaker),
LoggingInterceptor(config.logging),
MetricsInterceptor(config.metrics),
TracingInterceptor(config.tracing),
DeadlineInterceptor(config.default_deadline)
])
7. CONSTRUCT client := AgentClient(
protocol = protocol_layer,
pool = connection_pool,
middleware = middleware_chain,
tool_registry = ToolRegistry(protocol_layer, config.tool_discovery),
memory_store = MemoryStore(protocol_layer, config.memory),
context_builder = ContextBuilder(config.context),
orchestrator = OrchestratorHandle(protocol_layer, config.orchestrator)
)
8. PERFORM client.health_check() with timeout = config.init_timeout
IF health_check fails AND config.fail_fast = true → RAISE Unavailable(...)
IF health_check fails AND config.fail_fast = false → LOG warning, SET client.degraded = true
9. EMIT telemetry event: SDK_INITIALIZED {
transport, pool_size, middleware_count, degraded_state, latency_ms
}
10. RETURN client5.2.4 Cross-Language Behavioral Contract#
Regardless of implementation language, all SDKs must satisfy the following behavioral contract, validated by a shared cross-language conformance test suite:
- Serialization Fidelity: Round-trip serialization of all contract types must be lossless across all supported transports.
- Error Classification Parity: Identical server-side errors must produce identical
SdkErrorclassifications across all SDKs. - Retry Semantics: Given identical retry policies, all SDKs must produce the same retry schedule (within jitter bounds).
- Streaming Backpressure: All SDKs must respect server-initiated flow control signals without unbounded buffering.
- Cancellation Propagation: Client-side cancellation must propagate to the server within one round-trip time.
- Telemetry Compatibility: All SDKs must emit OpenTelemetry-compatible spans with identical attribute schemas.
5.3 Core Abstractions: AgentClient, ToolRegistry, MemoryStore, ContextBuilder, OrchestratorHandle#
5.3.1 Abstraction Dependency Graph#
The five core abstractions form a directed acyclic graph of dependencies:
OrchestratorHandle
├── AgentClient
│ ├── ToolRegistry
│ ├── MemoryStore
│ └── ContextBuilder
│ ├── ToolRegistry (schema retrieval)
│ └── MemoryStore (memory summaries)
└── ContextBuilder (plan context assembly)Invariant: Circular dependencies between core abstractions are prohibited. The OrchestratorHandle is the sole composition root for multi-agent scenarios.
5.3.2 AgentClient#
The AgentClient is the primary entry point for all agent interactions. It encapsulates connection management, request dispatch, response deserialization, and error handling.
Typed Interface (Language-Agnostic Pseudo-Type):
TYPE AgentClient:
// Lifecycle
FUNC connect(config: ConnectionConfig) → Result<void, SdkError>
FUNC disconnect(grace_period: Duration) → Result<void, SdkError>
FUNC health() → Result<HealthStatus, SdkError>
// Core Agent Operations
FUNC execute(request: AgentRequest, deadline: Duration) → Result<AgentResponse, SdkError>
FUNC stream_execute(request: AgentRequest, deadline: Duration) → AsyncStream<AgentChunk, SdkError>
// Plan-Act-Verify Loop (Orchestrated)
FUNC run_loop(task: TaskSpec, options: LoopOptions) → Result<TaskResult, SdkError>
// Tool Operations (delegated)
PROPERTY tools: ToolRegistry
// Memory Operations (delegated)
PROPERTY memory: MemoryStore
// Context Operations (delegated)
PROPERTY context: ContextBuilder
// Orchestration (delegated)
PROPERTY orchestrator: OrchestratorHandle
// Middleware
FUNC use(interceptor: Interceptor) → voidKey Contract:
executeis idempotent for read-only requests and at-most-once for state-mutating requests unless the request carries an explicit idempotency key.stream_executedelivers partial results via anAsyncStreamthat supports backpressure, cancellation, and resumption from a server-provided checkpoint token.run_loopimplements the bounded control loop (§5.3.7) internally and returns only upon reaching exit criteria or exhausting the recursion budget.
5.3.3 ToolRegistry#
The ToolRegistry manages discovery, schema caching, invocation, and lifecycle of all tools available to the agent runtime.
TYPE ToolRegistry:
FUNC discover(filter: ToolFilter, pagination: PageRequest) → Result<Page<ToolDescriptor>, SdkError>
FUNC get_schema(tool_id: ToolId, version: SemanticVersion?) → Result<ToolSchema, SdkError>
FUNC invoke(tool_id: ToolId, input: TypedInput, options: InvokeOptions) → Result<ToolOutput, SdkError>
FUNC invoke_streaming(tool_id: ToolId, input: TypedInput) → AsyncStream<ToolChunk, SdkError>
FUNC subscribe_changes(filter: ToolFilter) → AsyncStream<ToolChangeEvent, SdkError>
FUNC validate_input(tool_id: ToolId, input: JsonValue) → Result<ValidationResult, SdkError>
FUNC cached_schemas() → Map<ToolId, CachedToolSchema>
FUNC invalidate_cache(tool_id: ToolId?) → voidSchema Caching Strategy:
Define the cache invalidation policy as:
where:
- : initial TTL (e.g., 300 seconds)
- : maximum TTL cap (e.g., 3600 seconds)
- : exponential growth factor
- : number of consecutive cache hits without a schema change at time
Schemas that change frequently converge to ; stable schemas converge to , minimizing discovery overhead.
5.3.4 MemoryStore#
The MemoryStore provides typed access to the platform's multi-tier memory system (working, session, episodic, semantic, procedural).
TYPE MemoryStore:
// Read Operations
FUNC query(query: MemoryQuery, tier: MemoryTier?) → Result<List<MemoryItem>, SdkError>
FUNC get(key: MemoryKey, tier: MemoryTier) → Result<MemoryItem?, SdkError>
FUNC summarize(scope: MemoryScope, budget: TokenBudget) → Result<MemorySummary, SdkError>
// Write Operations (validated, provenance-tagged)
FUNC write(item: MemoryWriteRequest) → Result<MemoryReceipt, SdkError>
FUNC update(key: MemoryKey, patch: MemoryPatch, precondition: ETag?) → Result<MemoryReceipt, SdkError>
FUNC delete(key: MemoryKey, reason: DeletionReason) → Result<void, SdkError>
// Policy
FUNC set_expiry(key: MemoryKey, policy: ExpiryPolicy) → Result<void, SdkError>
FUNC deduplicate(scope: MemoryScope) → Result<DeduplicationReport, SdkError>Write Admission Control:
Every write operation passes through a validation pipeline before admission:
ALGORITHM: MemoryWriteAdmission(item: MemoryWriteRequest) → Result<MemoryReceipt, SdkError>
1. VALIDATE item.schema against tier-specific MemoryItemSchema
IF invalid → RETURN Err(InvalidRequest("Schema violation"))
2. CHECK provenance:
IF item.provenance IS EMPTY → RETURN Err(InvalidRequest("Provenance required"))
VALIDATE item.provenance.source ∈ ALLOWED_SOURCES
3. DEDUPLICATION check:
existing := memory_index.find_similar(item.content, threshold = 0.95)
IF existing IS NOT EMPTY:
IF item.merge_policy = SKIP → RETURN Ok(existing.receipt)
IF item.merge_policy = MERGE → item := merge(existing, item)
IF item.merge_policy = REPLACE → mark existing as superseded
4. EVALUATE expiry_policy:
IF item.expiry IS UNDEFINED → item.expiry := tier_default_expiry(item.tier)
5. COMPUTE item.embedding := embed(item.content) for semantic indexing
6. PERSIST item to durable store with CAS (compare-and-swap) semantics
IF CAS conflict → RETURN Err(ResourceExhausted("Concurrent write conflict"))
7. EMIT event: MEMORY_WRITTEN { key, tier, provenance, size_tokens, expiry }
8. RETURN Ok(MemoryReceipt { key, version, timestamp, tier })5.3.5 ContextBuilder#
The ContextBuilder implements the prefill compiler: it assembles role policy, task state, retrieved evidence, tool affordances, and memory summaries into a deterministic, token-budgeted preamble.
TYPE ContextBuilder:
FUNC set_role(policy: RolePolicy) → ContextBuilder
FUNC set_task(objective: TaskObjective) → ContextBuilder
FUNC add_retrieved(evidence: List<RetrievedChunk>) → ContextBuilder
FUNC add_tool_affordances(tools: List<ToolDescriptor>) → ContextBuilder
FUNC add_memory_summary(summary: MemorySummary) → ContextBuilder
FUNC add_history(messages: List<Message>, compression: CompressionPolicy?) → ContextBuilder
FUNC set_budget(budget: TokenBudget) → ContextBuilder
FUNC compile() → Result<CompiledContext, SdkError>Token Budget Allocation Algorithm:
Given a total token budget and context sections each with priority weight and minimum allocation :
ALGORITHM: AllocateTokenBudget(B: int, sections: List<Section>) → Map<SectionId, int>
1. // Phase 1: Satisfy minimum allocations
total_min := SUM(s.min_tokens FOR s IN sections)
IF total_min > B → RAISE ResourceExhausted("Budget cannot satisfy minimums")
remaining := B - total_min
2. // Phase 2: Proportional allocation of remaining budget
eligible := [s FOR s IN sections WHERE s.content_tokens > s.min_tokens]
total_weight := SUM(s.priority_weight FOR s IN eligible)
FOR EACH s IN eligible:
s.bonus := FLOOR(remaining * (s.priority_weight / total_weight))
s.allocated := MIN(s.min_tokens + s.bonus, s.content_tokens)
3. // Phase 3: Redistribute unused tokens (sections at capacity)
unused := remaining - SUM(s.bonus FOR s IN eligible)
WHILE unused > 0 AND EXISTS s IN eligible WHERE s.allocated < s.content_tokens:
distribute unused proportionally among under-capacity sections
recalculate unused
4. // Phase 4: Construct allocation map
RETURN { s.id: s.allocated FOR s IN sections }Formal Budget Constraint:
where is the final allocation for section , subject to:
with being the marginal utility function for section (typically concave, reflecting diminishing returns of additional context).
5.3.6 OrchestratorHandle#
The OrchestratorHandle provides the SDK-level interface to the multi-agent orchestration engine.
TYPE OrchestratorHandle:
FUNC submit_task(task: TaskSpec) → Result<TaskHandle, SdkError>
FUNC get_status(task_id: TaskId) → Result<TaskStatus, SdkError>
FUNC cancel(task_id: TaskId, reason: CancellationReason) → Result<void, SdkError>
FUNC subscribe(task_id: TaskId) → AsyncStream<TaskEvent, SdkError>
FUNC list_agents(filter: AgentFilter) → Result<List<AgentDescriptor>, SdkError>
FUNC claim_work_unit(agent_id: AgentId, unit_id: WorkUnitId, lease: Duration) → Result<Lease, SdkError>
FUNC commit_result(lease: Lease, result: WorkUnitResult) → Result<CommitReceipt, SdkError>
FUNC rollback(lease: Lease, reason: RollbackReason) → Result<void, SdkError>Lease-Based Work Claiming:
The lease mechanism ensures that no two agents operate on the same work unit concurrently:
where is the lease object, is the issue timestamp, and is the lease duration. Expired or revoked leases cause commit_result to fail with ResourceExhausted, triggering compensating actions.
5.3.7 Integrated Agent Loop via SDK#
The SDK exposes the canonical bounded control loop as a first-class operation:
ALGORITHM: AgentLoop(client: AgentClient, task: TaskSpec, opts: LoopOptions) → TaskResult
1. plan := client.execute(PlanRequest(task, client.context.compile()), opts.deadline)
2. steps := decompose(plan, max_steps = opts.max_steps)
3. iteration := 0
4. WHILE steps IS NOT EMPTY AND iteration < opts.max_iterations:
step := steps.dequeue()
// Retrieve
evidence := client.context.retrieve(step.query, budget = opts.retrieval_budget)
// Act
action_result := MATCH step.type:
TOOL_CALL → client.tools.invoke(step.tool_id, step.input, opts.tool_options)
LLM_CALL → client.execute(step.prompt, opts.llm_deadline)
MEMORY_OP → client.memory.write(step.memory_item)
// Verify
verification := verify(action_result, step.expected_postconditions)
IF verification.passed:
// Critique (optional quality gate)
critique := client.execute(CritiqueRequest(step, action_result), opts.critique_deadline)
IF critique.severity < opts.critique_threshold:
commit(step, action_result)
ELSE:
// Repair
repair_steps := client.execute(RepairRequest(step, critique), opts.repair_deadline)
steps.prepend(repair_steps)
ELSE:
// Rollback and retry or escalate
IF step.retry_count < opts.max_step_retries:
step.retry_count += 1
steps.prepend(step)
ELSE:
persist_failure(step, verification)
IF opts.on_failure = ABORT → RETURN TaskResult.failed(step, verification)
IF opts.on_failure = SKIP → CONTINUE
IF opts.on_failure = ESCALATE → AWAIT human_approval(step)
iteration += 1
5. EMIT telemetry: AGENT_LOOP_COMPLETE { iterations, steps_completed, failures, total_tokens }
6. RETURN TaskResult.success(committed_results)5.4 Connection Lifecycle Management: Pooling, Reconnection, Health Checks, Graceful Shutdown#
5.4.1 Connection Pool Model#
The connection pool maintains a bounded set of reusable connections with explicit lifecycle states:
State Transition Diagram:
IDLE ──acquire──→ ACTIVE ──release──→ IDLE
│ │
│ └──error──→ FAILED ──evict──→ CLOSED
│
└──health_fail──→ FAILED ──evict──→ CLOSED
│
└──shutdown──→ DRAINING ──drained──→ CLOSEDPool Sizing Formulation:
Optimal pool size under steady-state load:
where:
- : mean request arrival rate (requests/second)
- : mean service time per request (seconds)
- : target headroom fraction (e.g., 0.2 for 20% spare capacity)
The pool enforces hard bounds: , with elastic scaling within these bounds.
5.4.2 Pseudo-Algorithm: Connection Acquisition with Health-Aware Selection#
ALGORITHM: AcquireConnection(pool: ConnectionPool, deadline: Duration) → Result<Connection, SdkError>
1. start_time := now()
2. WHILE now() - start_time < deadline:
// Try to acquire an idle connection
c := pool.idle_queue.poll(timeout = MIN(100ms, deadline - elapsed))
IF c IS NOT NULL:
IF c.last_health_check > now() - pool.health_check_interval:
c.state := ACTIVE
RETURN Ok(c)
ELSE:
// Inline health check
health := c.ping(timeout = pool.health_check_timeout)
IF health.ok:
c.last_health_check := now()
c.state := ACTIVE
RETURN Ok(c)
ELSE:
c.state := FAILED
pool.evict(c)
pool.metrics.increment("connection.health_fail")
CONTINUE
// No idle connection available; try to create new
IF pool.size() < pool.max_size:
c := pool.create_new(timeout = deadline - elapsed)
IF c IS NOT NULL:
c.state := ACTIVE
RETURN Ok(c)
// Pool exhausted; apply backpressure
pool.metrics.increment("connection.pool_exhausted")
3. RETURN Err(DeadlineExceeded("Connection acquisition timed out"))5.4.3 Reconnection Strategy#
Reconnection uses exponential backoff with decorrelated jitter (superior to full jitter for thundering-herd avoidance):
with and a maximum retry count .
5.4.4 Graceful Shutdown Protocol#
ALGORITHM: GracefulShutdown(client: AgentClient, grace_period: Duration)
1. client.state := SHUTTING_DOWN
2. STOP accepting new requests; return Unavailable for queued requests
3. // Drain in-flight operations
in_flight := client.pool.active_connections()
AWAIT all_completed(in_flight, timeout = grace_period * 0.7)
4. // Force-close remaining
remaining := client.pool.active_connections()
FOR EACH c IN remaining:
c.send_goaway()
c.close(timeout = grace_period * 0.2)
5. // Flush telemetry buffers
client.telemetry.flush(timeout = grace_period * 0.1)
6. // Release resources
client.pool.destroy()
client.state := CLOSED
7. EMIT event: SDK_SHUTDOWN { in_flight_drained, force_closed, flush_success }5.4.5 Health Check Mechanism#
The SDK implements active and passive health checking:
- Active: Periodic ping/health-check RPC at interval (configurable, default 30s). Uses a dedicated low-priority connection to avoid head-of-line blocking.
- Passive: Every successful response resets the connection's health timer. Consecutive failures () trigger immediate eviction.
The combined health score for a connection:
Connections with are evicted and replaced.
5.5 Middleware and Interceptor Chains: Logging, Metrics, Auth Injection, Retry, Rate Limiting#
5.5.1 Interceptor Algebra#
The middleware chain is a composed function applied to every request-response pair:
where each interceptor has the signature:
and is the continuation representing the remainder of the chain plus the actual transport call. This allows each interceptor to:
- Inspect/modify the request before forwarding
- Inspect/modify the response after receiving
- Short-circuit the chain (e.g., rate limiter rejecting without network call)
- Retry by invoking
Nextmultiple times (with idempotency guards)
5.5.2 Interceptor Ordering Constraints#
Interceptor ordering is not arbitrary. The canonical ordering enforces correctness:
| Position | Interceptor | Rationale |
|---|---|---|
| 1 (outermost) | Tracing | Must capture the full lifecycle, including retries |
| 2 | Metrics | Must measure wall-clock latency including retries |
| 3 | Logging | Must log the final request shape after auth injection |
| 4 | Auth Injection | Must add credentials before any network call |
| 5 | Rate Limiting | Must reject before consuming retry budget |
| 6 | Circuit Breaker | Must reject before attempting a known-failing endpoint |
| 7 | Retry | Must wrap the actual call and deadline interceptor |
| 8 | Deadline | Must enforce per-attempt timeout |
| 9 (innermost) | Transport | Actual network call |
Ordering Invariant:
Violations detected at initialization time cause the SDK to raise InvalidRequest("Interceptor ordering violation").
5.5.3 Retry Interceptor#
ALGORITHM: RetryInterceptor(request: Request, next: Next, policy: RetryPolicy) → Result<Response, SdkError>
1. attempt := 0
2. last_error := NULL
3. WHILE attempt < policy.max_attempts:
// Ensure idempotency key is set for retries
IF attempt > 0 AND request.idempotency_key IS NULL:
IF request.is_mutating → RETURN Err(last_error) // Cannot safely retry
// Read-only requests are inherently idempotent
TRY:
response := next(request)
IF response.is_success OR NOT policy.retry_on(response.status):
RETURN Ok(response)
last_error := classify_error(response)
CATCH error:
last_error := classify_error(error)
IF NOT policy.retry_on(last_error.class):
RETURN Err(last_error)
attempt += 1
// Compute backoff
delay := compute_backoff(attempt, policy)
EMIT metric: sdk.retry { attempt, delay_ms, error_class }
SLEEP(delay)
4. RETURN Err(last_error.with_context("Exhausted retry budget", attempts = attempt))
FUNCTION compute_backoff(attempt: int, policy: RetryPolicy) → Duration:
base := policy.initial_backoff * (policy.multiplier ^ (attempt - 1))
capped := MIN(base, policy.max_backoff)
jittered := capped * RANDOM(0.5, 1.0) // Half-jitter
RETURN jittered5.5.4 Rate Limiting Interceptor#
The SDK-side rate limiter uses a token bucket algorithm:
where is the bucket capacity, is the refill rate (tokens/second), and is the last refill timestamp.
ALGORITHM: RateLimitInterceptor(request: Request, next: Next, bucket: TokenBucket) → Result<Response, SdkError>
1. cost := estimate_token_cost(request) // Based on request size, operation class
2. IF bucket.try_acquire(cost):
RETURN next(request)
3. // Check server-provided retry-after header from previous 429
IF bucket.retry_after IS SET AND now() < bucket.retry_after:
wait_time := bucket.retry_after - now()
ELSE:
wait_time := bucket.time_to_refill(cost)
4. IF wait_time > request.deadline.remaining():
RETURN Err(RateLimited("Insufficient budget within deadline"))
5. SLEEP(wait_time)
6. IF bucket.try_acquire(cost):
RETURN next(request)
7. ELSE:
RETURN Err(RateLimited("Rate limit exceeded after wait"))5.5.5 Circuit Breaker Interceptor#
Implements a three-state automaton: CLOSED (normal), OPEN (failing), HALF-OPEN (probing).
where is the failure rate over the sliding window, is the threshold (e.g., 0.5), and is the recovery timeout.
Failure rate computation over sliding window:
where is the set of outcomes within the last seconds and (minimum sample size to avoid premature tripping).
5.5.6 Auth Injection Interceptor#
ALGORITHM: AuthInterceptor(request: Request, next: Next, provider: AuthProvider) → Result<Response, SdkError>
1. token := provider.get_cached_token()
2. IF token IS NULL OR token.expires_at < now() + REFRESH_BUFFER:
token := provider.refresh_token()
IF token IS Err → RETURN Err(AuthFailure("Token refresh failed"))
3. request.headers["Authorization"] := "Bearer " + token.value
4. request.headers["X-SDK-Client-Id"] := provider.client_id
5. response := next(request)
6. IF response.status = 401 (Unauthorized):
// Token may have been revoked; force refresh
provider.invalidate_cache()
token := provider.refresh_token()
IF token IS Err → RETURN Err(AuthFailure("Re-auth failed"))
request.headers["Authorization"] := "Bearer " + token.value
response := next(request)
7. RETURN response5.6 Async-First Execution: Futures, Coroutines, Reactive Streams, Structured Concurrency#
5.6.1 Concurrency Model Requirements#
Agentic SDK workloads are inherently concurrent: tool invocations run in parallel, streaming responses arrive asynchronously, memory queries overlap with LLM inference, and multi-agent orchestration requires fan-out/fan-in patterns. The SDK must provide structured concurrency primitives that prevent resource leaks, dangling tasks, and unobserved failures.
5.6.2 Structured Concurrency Contract#
Define a task scope with the following invariants:
- Bounded lifetime: completes only when all child tasks have completed (successfully, via error, or via cancellation).
- Error propagation: An unhandled error in any child task cancels all sibling tasks and propagates to the scope's parent.
- Cancellation cascading: Cancellation of cancels all child tasks transitively.
- Resource binding: Resources acquired within are released when exits, regardless of exit path.
Formal lifetime constraint:
5.6.3 Pseudo-Algorithm: Parallel Tool Invocation with Structured Concurrency#
ALGORITHM: ParallelToolInvoke(
registry: ToolRegistry,
calls: List<ToolCall>,
max_concurrency: int,
deadline: Duration
) → Result<List<ToolOutput>, SdkError>
1. CREATE scope S with deadline and cancellation_token
2. semaphore := Semaphore(max_concurrency)
results := ConcurrentMap<int, Result<ToolOutput, SdkError>>()
3. FOR EACH (index, call) IN enumerate(calls):
SPAWN task IN scope S:
ACQUIRE semaphore (or AWAIT within deadline)
TRY:
output := registry.invoke(call.tool_id, call.input, InvokeOptions {
deadline = deadline,
cancellation = S.token
})
results[index] := output
CATCH error:
results[index] := Err(error)
IF error.class IN {AuthFailure, InvalidRequest}:
S.cancel("Non-retryable tool failure")
FINALLY:
RELEASE semaphore
4. AWAIT scope S completion
5. // Collect results in original order
ordered := [results[i] FOR i IN 0..len(calls)]
6. // Determine aggregate outcome
failures := [r FOR r IN ordered WHERE r IS Err]
IF len(failures) > 0 AND policy = FAIL_FAST:
RETURN Err(AggregateError(failures))
7. RETURN Ok(ordered)5.6.4 Streaming Response Processing#
Streaming agent responses are modeled as async sequences with backpressure:
ALGORITHM: ProcessAgentStream(
stream: AsyncStream<AgentChunk>,
handler: ChunkHandler,
budget: TokenBudget
) → Result<AgentResponse, SdkError>
1. accumulated_tokens := 0
assembled_response := ResponseAssembler()
2. FOR EACH chunk IN stream:
// Backpressure: pause consumption if handler is slow
AWAIT handler.ready()
// Token budget enforcement
accumulated_tokens += chunk.token_count
IF accumulated_tokens > budget.max_output_tokens:
stream.cancel("Output token budget exceeded")
BREAK
// Deliver to handler
handler.on_chunk(chunk)
assembled_response.append(chunk)
// Check for tool-call chunks requiring interleaved execution
IF chunk.type = TOOL_CALL_REQUEST:
tool_result := AWAIT execute_tool_call(chunk.tool_call)
stream.inject_context(tool_result) // Feed back into stream
3. IF stream.error IS NOT NULL:
RETURN Err(stream.error)
4. RETURN Ok(assembled_response.finalize())5.6.5 Language-Specific Concurrency Mapping#
| SDK Pattern | Python | TypeScript | Rust | Go | Kotlin | C# | Swift |
|---|---|---|---|---|---|---|---|
| Task scope | asyncio.TaskGroup | Custom TaskScope | tokio::task::JoinSet | errgroup.Group | coroutineScope | Task.WhenAll + CTS | withTaskGroup |
| Semaphore | asyncio.Semaphore | Custom / p-limit | tokio::sync::Semaphore | Buffered channel | Semaphore (coroutines) | SemaphoreSlim | AsyncSemaphore |
| Cancellation | Task.cancel() | AbortController | CancellationToken | context.WithCancel | Job.cancel() | CancellationToken | Task.cancel() |
| Stream | async for | for await...of | while let Some(x) = stream.next().await | for x := range ch | flow.collect | await foreach | for await x in seq |
5.7 Offline-First and Edge SDK Variants: Local Inference, Cached Tool Schemas, Sync-on-Reconnect#
5.7.1 Offline Capability Model#
Edge deployments (IoT, mobile, air-gapped, intermittent-connectivity) require SDKs that function without continuous platform access. The offline SDK variant operates as a degraded-but-functional subset of the full SDK.
Capability Classification:
| Capability | Online | Offline | Degradation Mode |
|---|---|---|---|
| Agent execution | Full LLM | Local model / cached responses | Reduced quality, bounded context |
| Tool discovery | Live MCP | Cached schema bundle | Stale schemas, no new tools |
| Tool invocation | Remote + local | Local-only tools | Subset of tools available |
| Memory read | Full platform | Local replica | Potentially stale |
| Memory write | Immediate persist | Write-ahead log (WAL) | Sync-on-reconnect |
| Context retrieval | Full RAG pipeline | Local vector store + cache | Reduced corpus |
| Telemetry | Real-time emission | Local buffer → batch upload | Delayed observability |
5.7.2 Sync-on-Reconnect Protocol#
ALGORITHM: SyncOnReconnect(offline_store: OfflineStore, platform: AgentClient)
1. DETECT connectivity restored (via health check probe)
2. // Phase 1: Upload buffered telemetry
telemetry_batch := offline_store.drain_telemetry_buffer()
platform.telemetry.batch_upload(telemetry_batch)
3. // Phase 2: Replay write-ahead log for memory writes
wal_entries := offline_store.read_wal(order = CHRONOLOGICAL)
FOR EACH entry IN wal_entries:
result := platform.memory.write(entry.item)
IF result IS Ok:
offline_store.wal.mark_committed(entry.id)
ELSE IF result.error.class = ConflictError:
// Conflict resolution
resolved := conflict_resolver.resolve(entry, result.server_version)
platform.memory.write(resolved)
offline_store.wal.mark_committed(entry.id)
ELSE:
offline_store.wal.mark_retry(entry.id)
4. // Phase 3: Refresh cached tool schemas
stale_schemas := offline_store.tool_cache.get_stale(threshold = schema_ttl)
FOR EACH schema IN stale_schemas:
updated := platform.tools.get_schema(schema.tool_id)
IF updated.version > schema.version:
offline_store.tool_cache.update(updated)
5. // Phase 4: Sync local memory replica
last_sync := offline_store.memory_replica.last_sync_timestamp
delta := platform.memory.query(MemoryQuery { modified_after = last_sync })
offline_store.memory_replica.apply_delta(delta)
6. EMIT event: SYNC_COMPLETE {
wal_committed, wal_conflicted, schemas_refreshed, memory_items_synced
}5.7.3 Local Inference Integration#
The offline SDK supports local model inference for degraded agent execution:
TYPE LocalInferenceConfig:
model_path: FilePath // GGUF, ONNX, CoreML, TFLite
model_format: ModelFormat // GGUF | ONNX | COREML | TFLITE
context_window: int // Typically 2048–8192 for edge models
max_output_tokens: int
quantization: QuantizationLevel // Q4_K_M, Q8_0, FP16, etc.
device: InferenceDevice // CPU | GPU | NPU | ANE
batch_size: int
temperature: floatContext Budget Adaptation for Edge:
Given a reduced context window , the ContextBuilder applies aggressive compression:
The budget allocation algorithm (§5.3.5) runs identically but with , automatically truncating lower-priority sections and applying more aggressive history compression.
5.7.4 Conflict Resolution for Offline Writes#
When offline writes conflict with server state during sync:
The merge function is type-specific: append-only logs use union merge; key-value stores use last-writer-wins with vector clocks; structured memory items use three-way merge with the common ancestor.
5.8 SDK Versioning, Backward Compatibility Guarantees, and Deprecation Policy#
5.8.1 Versioning Scheme#
The SDK follows Semantic Versioning 2.0.0 with agentic-specific extensions:
| Component | Increment Trigger |
|---|---|
| MAJOR | Breaking change in any L2 (Contract) or L3 (Service) public API; removal of a deprecated method; incompatible wire format change |
| MINOR | New public API (additive); new transport support; new interceptor; new optional parameter with backward-compatible default |
| PATCH | Bug fix; performance improvement; documentation correction; dependency update without API change |
5.8.2 Compatibility Matrix#
The SDK performs version negotiation during connection initialization:
ALGORITHM: VersionNegotiation(client_version: Version, server_capabilities: ServerCapabilities) → NegotiationResult
1. IF server_capabilities.min_sdk_version > client_version:
RETURN NegotiationResult.UNSUPPORTED(
"Server requires SDK >= " + server_capabilities.min_sdk_version
)
2. IF client_version.MAJOR != server_capabilities.current_major:
IF server_capabilities.compat_shims.contains(client_version.MAJOR):
RETURN NegotiationResult.DEGRADED(
active_shim = server_capabilities.compat_shims[client_version.MAJOR],
warnings = ["Operating in compatibility mode"]
)
ELSE:
RETURN NegotiationResult.UNSUPPORTED(...)
3. // Negotiate feature flags
agreed_features := INTERSECT(
client_version.supported_features,
server_capabilities.available_features
)
4. RETURN NegotiationResult.FULL(
protocol_version = server_capabilities.current_protocol,
features = agreed_features,
deprecated_methods = server_capabilities.deprecations.filter(
affects = client_version
)
)5.8.3 Deprecation Policy#
| Phase | Duration | SDK Behavior |
|---|---|---|
| Announced | 1 minor release before removal | Compile-time/runtime deprecation warning; method remains fully functional |
| Soft Deprecated | 2 minor releases | Warning escalated to structured log at WARN level; telemetry emits deprecation usage count |
| Hard Deprecated | Next MAJOR release | Method removed from public API; internal redirect to replacement if feasible |
| Removed | MAJOR + 1 | No trace of deprecated method; compatibility shim dropped |
Deprecation Metadata:
TYPE DeprecationInfo:
method: MethodSignature
deprecated_since: Version
removal_target: Version
replacement: MethodSignature?
migration_guide_url: URI
reason: stringAll deprecated methods emit telemetry with sdk.deprecated_call metric, enabling platform operators to track migration progress before removal.
5.8.4 Wire Protocol Versioning#
The SDK supports content negotiation for wire protocol versions:
- JSON-RPC:
"jsonrpc": "2.0"with platform-specific extension headerX-Platform-Protocol-Version: 3 - gRPC: Service versioning via package namespacing (
platform.v3.AgentService) - MCP: Protocol version negotiated during
initializehandshake
The SDK maintains a protocol adapter registry that maps protocol versions to serialization/deserialization implementations:
5.9 Code Generation from Proto/Schema Definitions: End-to-End Typed Client Pipelines#
5.9.1 Code Generation Architecture#
The code generation pipeline transforms platform schema definitions into language-specific SDK modules through a multi-stage compilation process:
Stage Descriptions:
| Stage | Input | Output | Key Operations |
|---|---|---|---|
| Parse | .proto, JSON Schema, OpenAPI, MCP manifest | Abstract Schema IR | Validation, normalization, reference resolution |
| Transform | Schema IR | Augmented IR | Add builder patterns, validation logic, doc comments, deprecation annotations, default values |
| Emit | Augmented IR | Raw source code per target language | Template-based generation with language-specific AST construction |
| Post-process | Raw source | Formatted, linted, tested module | Auto-formatting, lint enforcement, snapshot test generation |
5.9.2 Intermediate Representation (IR)#
TYPE SchemaIR:
messages: List<MessageDef>
services: List<ServiceDef>
enums: List<EnumDef>
errors: List<ErrorDef>
extensions: Map<string, Any>
TYPE MessageDef:
name: QualifiedName
fields: List<FieldDef>
oneofs: List<OneofDef>
nested: List<MessageDef>
options: MessageOptions // e.g., deprecated, experimental
TYPE FieldDef:
name: string
number: int
type: FieldType // Scalar, Message, Enum, Map, Repeated
optional: bool
default: Value?
constraints: List<Constraint> // min, max, pattern, custom validators
documentation: DocString
deprecation: DeprecationInfo?
TYPE ServiceDef:
name: QualifiedName
methods: List<MethodDef>
version: SemanticVersion
TYPE MethodDef:
name: string
input: MessageRef
output: MessageRef
streaming: StreamingMode // UNARY | SERVER_STREAM | CLIENT_STREAM | BIDI_STREAM
idempotency: IdempotencyLevel // UNKNOWN | IDEMPOTENT | NO_SIDE_EFFECTS
deadline_class: DeadlineClass // FAST (<100ms) | NORMAL (<5s) | SLOW (<60s) | LONG_RUNNING
auth_required: bool
approval_gated: bool // Requires human approval for execution5.9.3 Pseudo-Algorithm: End-to-End Code Generation Pipeline#
ALGORITHM: GenerateSDK(
schemas: List<SchemaSource>,
targets: List<LanguageTarget>,
config: CodegenConfig
) → Map<LanguageTarget, GeneratedModule>
1. // Stage 1: Parse and validate
ir := SchemaIR.empty()
FOR EACH source IN schemas:
parsed := MATCH source.format:
PROTOBUF → parse_proto(source.path)
JSON_SCHEMA → parse_jsonschema(source.path)
OPENAPI → parse_openapi(source.path)
MCP_MANIFEST → parse_mcp(source.path)
validate_no_conflicts(ir, parsed)
ir.merge(parsed)
2. // Stage 2: Transform and augment
FOR EACH message IN ir.messages:
message.add_builder_pattern()
message.add_validation_methods(from = message.fields.constraints)
message.add_equality_and_hash()
message.add_serialization_annotations()
IF message.has_deprecated_fields():
message.add_deprecation_warnings()
FOR EACH service IN ir.services:
FOR EACH method IN service.methods:
method.add_retry_annotation(from = method.idempotency)
method.add_deadline_default(from = method.deadline_class)
method.add_interceptor_hooks()
3. // Stage 3: Emit per target language
results := Map.empty()
FOR EACH target IN targets:
emitter := LanguageEmitter.create(target, config)
raw_code := emitter.emit(ir)
// Stage 4: Post-process
formatted := target.formatter.format(raw_code)
linted := target.linter.check(formatted)
IF linted.has_errors():
RAISE CodegenError("Generated code has lint errors", linted.errors)
// Generate snapshot tests
tests := generate_serialization_round_trip_tests(ir, target)
tests += generate_validation_tests(ir, target)
tests += generate_builder_tests(ir, target)
results[target] := GeneratedModule {
source_files = formatted,
test_files = tests,
manifest = generate_package_manifest(target, config.version)
}
4. // Cross-language conformance: verify identical serialization output
reference_vectors := generate_conformance_vectors(ir)
FOR EACH target IN targets:
verify_conformance(results[target], reference_vectors)
5. RETURN results5.9.4 Schema-to-Type Mapping Rules#
| Proto/Schema Type | Python | TypeScript | Rust | Go | Kotlin | C# | Swift |
|---|---|---|---|---|---|---|---|
string | str | string | String | string | String | string | String |
int32 | int | number | i32 | int32 | Int | int | Int32 |
int64 | int | bigint | i64 | int64 | Long | long | Int64 |
float | float | number | f32 | float32 | Float | float | Float |
double | float | number | f64 | float64 | Double | double | Double |
bool | bool | boolean | bool | bool | Boolean | bool | Bool |
bytes | bytes | Uint8Array | Vec<u8> | []byte | ByteArray | byte[] | Data |
repeated T | List[T] | T[] | Vec<T> | []T | List<T> | IReadOnlyList<T> | [T] |
map<K,V> | Dict[K,V] | Map<K,V> | HashMap<K,V> | map[K]V | Map<K,V> | IReadOnlyDictionary<K,V> | [K:V] |
optional T | Optional[T] | `T \ | undefined` | Option<T> | *T | T? | T? |
oneof | Discriminated union (tagged) | Discriminated union | enum | Interface + impls | sealed class | Abstract record | enum with associated values |
Timestamp | datetime | Date | chrono::DateTime<Utc> | time.Time | Instant | DateTimeOffset | Date |
Duration | timedelta | number (ms) | std::time::Duration | time.Duration | Duration | TimeSpan | Duration |
5.9.5 Validation Code Generation#
For each field constraint in the schema, the codegen pipeline emits validation logic:
Example constraint types and their generated validators:
| Constraint | Schema Annotation | Generated Check |
|---|---|---|
| String length | min_length: 1, max_length: 256 | 1 ≤ len(value) ≤ 256 |
| Numeric range | minimum: 0, maximum: 1000 | 0 ≤ value ≤ 1000 |
| Pattern | pattern: "^[a-z][a-z0-9_]*$" | Regex match |
| Required | required: true | Non-null / non-default check |
| Enum membership | enum: [A, B, C] | Value ∈ {A, B, C} |
| Custom | x-validate: "token_budget" | User-defined validator function |
5.10 SDK Testing Harnesses: Mock Servers, Recorded Sessions, Deterministic Replay#
5.10.1 Testing Strategy Architecture#
SDK testing operates at four levels, each with distinct objectives:
| Level | Scope | Mechanism | Purpose |
|---|---|---|---|
| Unit | Individual methods, serialization, validation | In-process mocks | Correctness of contract types and interceptors |
| Integration | Client ↔ mock server | Mock server (in-process or local) | Protocol compliance, error handling, streaming |
| Conformance | Cross-language parity | Shared test vectors + recorded sessions | Behavioral equivalence across SDKs |
| End-to-End | Client ↔ real platform | Staging environment | Full-stack validation |
5.10.2 Mock Server Architecture#
The mock server is a deterministic, programmable server that simulates platform behavior:
TYPE MockServer:
FUNC start(config: MockServerConfig) → MockServerHandle
FUNC stop() → void
// Programming interface
FUNC when(matcher: RequestMatcher) → ResponseStub
FUNC verify(matcher: RequestMatcher, times: InvocationCount) → VerificationResult
FUNC record() → RecordingSession
FUNC replay(session: RecordedSession) → void
TYPE RequestMatcher:
method: MethodPattern // Exact, regex, wildcard
body: BodyMatcher // JSON path assertions, schema match
headers: HeaderMatcher
metadata: MetadataMatcher
TYPE ResponseStub:
FUNC respond_with(response: Response) → void
FUNC respond_with_sequence(responses: List<Response>) → void
FUNC respond_with_stream(chunks: List<Chunk>, interval: Duration) → void
FUNC respond_with_error(error: SdkError) → void
FUNC respond_with_delay(response: Response, delay: Duration) → void
FUNC respond_with_function(handler: (Request) → Response) → void5.10.3 Pseudo-Algorithm: Deterministic Replay Testing#
ALGORITHM: DeterministicReplayTest(
recorded_session: RecordedSession,
sdk_under_test: AgentClient
) → TestResult
1. // Phase 1: Load recorded interactions
interactions := recorded_session.load()
// Each interaction: (timestamp, request, response, metadata)
2. // Phase 2: Configure mock server with recorded responses
mock := MockServer.start(config = { transport = recorded_session.transport })
FOR EACH (i, interaction) IN enumerate(interactions):
mock.when(
RequestMatcher.exact(interaction.request)
).respond_with(interaction.response)
3. // Phase 3: Execute SDK operations against mock
sdk_under_test.connect(mock.endpoint)
actual_results := []
FOR EACH interaction IN interactions:
actual := sdk_under_test.execute(interaction.request, interaction.deadline)
actual_results.append(actual)
4. // Phase 4: Verify behavioral equivalence
mismatches := []
FOR EACH (expected, actual) IN zip(interactions, actual_results):
IF NOT deep_equals(expected.response, actual, config.comparison_rules):
mismatches.append(Mismatch {
index = i,
expected = expected.response,
actual = actual,
diff = compute_diff(expected.response, actual)
})
5. // Phase 5: Verify side effects
FOR EACH interaction IN interactions:
mock.verify(
RequestMatcher.exact(interaction.request),
times = EXACTLY_ONCE
)
6. mock.stop()
sdk_under_test.disconnect()
7. IF mismatches IS EMPTY:
RETURN TestResult.PASSED(interactions_verified = len(interactions))
ELSE:
RETURN TestResult.FAILED(mismatches)5.10.4 Session Recording for Regression Testing#
ALGORITHM: RecordSession(client: AgentClient, operations: List<Operation>) → RecordedSession
1. recorder := SessionRecorder.start()
client.use(RecordingInterceptor(recorder))
// RecordingInterceptor captures all request/response pairs with timestamps
2. FOR EACH op IN operations:
EXECUTE op via client
// RecordingInterceptor automatically captures interaction
3. session := recorder.finalize()
4. // Sanitize sensitive data
session.redact(patterns = [
RedactPattern.BEARER_TOKEN,
RedactPattern.API_KEY,
RedactPattern.PII_FIELDS
])
5. // Compute determinism hash
session.hash := SHA256(canonical_serialize(session.interactions))
6. session.save(path = config.recording_dir / session.id + ".session.json")
7. RETURN session5.10.5 Property-Based Testing for SDK Invariants#
Beyond example-based tests, the SDK testing harness supports property-based testing for verifying invariants:
| Property | Description | Generator |
|---|---|---|
| Serialization round-trip | for all valid | Random valid message instances |
| Retry idempotency | Retried idempotent requests produce identical results | Random request + transient failure injection |
| Error classification stability | Same server error always maps to same SdkError class | Random HTTP status / gRPC status codes |
| Token budget honoring | Compiled context never exceeds budget | Random context sections with varying sizes |
| Cancellation safety | Cancelled operations release all resources | Random cancellation timing |
5.11 Embedding SDKs in Hostile Environments: Browsers, Mobile, IoT, Serverless, Air-Gapped Systems#
5.11.1 Environment Constraint Taxonomy#
Each deployment environment imposes specific constraints that the SDK must accommodate:
| Environment | Constraints | SDK Adaptations |
|---|---|---|
| Browser | No raw TCP; limited fetch API; CORS restrictions; no filesystem; single-threaded JS main thread; bundle size limits | HTTP-only transport (JSON-RPC over fetch/SSE); Web Worker offloading; tree-shakeable modules; WASM for compute-intensive operations |
| Mobile (iOS/Android) | Battery/thermal constraints; background execution limits; app lifecycle interruptions; restricted networking | Batched telemetry; aggressive connection pooling; lifecycle-aware pause/resume; certificate pinning |
| IoT | Extreme memory/CPU limits (< 256KB RAM); intermittent connectivity; no TLS in some cases; real-time constraints | no_std Rust SDK; CBOR instead of JSON; pre-compiled tool schemas in ROM; minimal allocator |
| Serverless (Lambda, Cloud Functions) | Cold start latency; ephemeral filesystem; execution time limits; no persistent connections; pay-per-invocation cost | Lazy initialization; pre-warmed connection via init phase; stateless SDK mode; request-scoped lifecycle |
| Air-Gapped | No internet access; all dependencies bundled; no telemetry export; manual update cycles | Fully offline SDK variant; embedded model weights; local tool execution only; file-based telemetry export |
5.11.2 Browser SDK Architecture#
TYPE BrowserSdkConfig:
endpoint: URL // Must be HTTPS with valid CORS
transport: FETCH | SSE | WEBSOCKET
auth: BrowserAuthConfig // Cookie, bearer token, or OAuth PKCE flow
worker: boolean // Offload to Web Worker
bundle_budget: ByteSize // Max JS bundle size
storage: LOCAL_STORAGE | INDEXED_DB | MEMORY
wasm_modules: List<WasmModule> // Optional WASM for local inferenceBundle Size Optimization:
where is the set of features selected via tree-shaking, and is the deployment budget (typically 50–150 KB gzipped for the SDK layer).
The SDK uses dead code elimination entry points:
// Tree-shakeable exports
export { AgentClient } from './core/client'
export { ToolRegistry } from './tools/registry' // Optional
export { MemoryStore } from './memory/store' // Optional
export { ContextBuilder } from './context/builder' // Optional
// Consumer only imports what they use; bundler eliminates the rest5.11.3 Serverless SDK Lifecycle#
ALGORITHM: ServerlessSdkLifecycle(handler: RequestHandler) → LambdaHandler
1. // INIT PHASE (cold start, runs once)
GLOBAL client := AgentClient.create(ServerlessConfig {
pool_size = 1, // Minimal pooling
keepalive = true, // Reuse across invocations
lazy_tool_discovery = true, // Don't discover until needed
telemetry_mode = BATCHED, // Buffer and flush at end
memory_mode = STATELESS // No local memory persistence
})
GLOBAL initialized := false
2. // HANDLER (runs per invocation)
RETURN ASYNC FUNCTION(event, context):
IF NOT initialized:
AWAIT client.connect()
initialized := true
// Set invocation-scoped deadline from Lambda remaining time
deadline := context.remaining_time - SAFETY_MARGIN(500ms)
TRY:
result := AWAIT handler(client, event, deadline)
RETURN format_response(result)
CATCH error:
RETURN format_error(error)
FINALLY:
// Flush telemetry before freeze
AWAIT client.telemetry.flush(timeout = MIN(deadline * 0.1, 1000ms))
// Do NOT disconnect; connection persists across warm invocations5.11.4 IoT/Embedded SDK Constraints#
For extremely resource-constrained environments, the SDK provides a minimal core with the following formal bounds:
Typical targets:
| Resource | Target | Achieved By |
|---|---|---|
| RAM | 64 KB | Arena allocator; no dynamic allocation after init; pre-allocated buffers |
| Flash/ROM | 256 KB | no_std Rust; no string formatting; CBOR codec; compiled-in schemas |
| Init latency | 50 ms | No discovery at init; pre-configured tool list; no TLS handshake (pre-shared keys) |
5.11.5 Air-Gapped Deployment Model#
ALGORITHM: AirGappedSdkSetup(bundle: AirGapBundle) → AgentClient
1. // Validate bundle integrity
IF SHA256(bundle.content) != bundle.manifest.checksum:
RAISE SecurityError("Bundle integrity check failed")
2. // Extract components
model_weights := bundle.extract("model/")
tool_schemas := bundle.extract("tools/")
memory_snapshot := bundle.extract("memory/")
policy_config := bundle.extract("config/policy.json")
3. // Initialize local-only SDK
client := AgentClient.create(AirGapConfig {
inference = LocalInferenceConfig {
model_path = model_weights,
device = detect_available_device()
},
tools = StaticToolRegistry(tool_schemas),
memory = FileBackedMemoryStore(memory_snapshot),
telemetry = FileExporter(path = "/var/log/agent/telemetry/"),
connectivity = NEVER // No network operations attempted
})
4. RETURN client5.12 Telemetry and Diagnostics: SDK-Level Trace Emission, Performance Profiling, Error Reporting#
5.12.1 Telemetry Architecture#
The SDK telemetry system implements the three pillars of observability—traces, metrics, and logs—plus a fourth pillar specific to agentic systems: decision provenance.
TYPE TelemetryConfig:
traces: TraceConfig
metrics: MetricsConfig
logs: LogConfig
provenance: ProvenanceConfig
export: ExportConfig // OTLP, stdout, file, custom
sampling: SamplingConfig
privacy: PrivacyConfig // PII redaction, content filtering
budget: TelemetryBudget // Max overhead: CPU%, memory, bandwidth5.12.2 Trace Emission Model#
Every SDK operation emits OpenTelemetry-compatible distributed traces with agentic-specific semantic conventions:
Span Hierarchy for an Agent Execution:
[sdk.agent.execute] // Root span
├── [sdk.context.compile] // Context compilation
│ ├── [sdk.context.retrieve] // Evidence retrieval
│ ├── [sdk.memory.summarize] // Memory summarization
│ └── [sdk.context.budget_allocate] // Token budget allocation
├── [sdk.transport.request] // Wire-level request
│ ├── [sdk.middleware.auth] // Auth injection
│ ├── [sdk.middleware.retry] // Retry wrapper
│ │ ├── [sdk.transport.attempt.1] // First attempt
│ │ └── [sdk.transport.attempt.2] // Retry
│ └── [sdk.middleware.circuit_breaker] // Circuit breaker check
├── [sdk.tool.invoke] // Tool invocation (if streaming)
└── [sdk.response.deserialize] // Response processingSpan Attributes (Agentic Semantic Conventions):
| Attribute | Type | Description |
|---|---|---|
agent.task.id | string | Unique task identifier |
agent.loop.iteration | int | Current iteration in the plan-act-verify loop |
agent.loop.phase | string | `PLAN \ |
agent.context.tokens.budget | int | Token budget for this operation |
agent.context.tokens.used | int | Actual tokens consumed |
agent.tool.id | string | Tool being invoked |
agent.tool.approval_required | bool | Whether human approval was needed |
agent.memory.tier | string | Memory tier accessed |
agent.model.id | string | Model used for inference |
agent.retry.attempt | int | Retry attempt number |
agent.error.class | string | Canonical error classification |
5.12.3 Metrics Emission#
The SDK emits structured metrics with the following canonical set:
| Metric Name | Type | Unit | Description |
|---|---|---|---|
sdk.request.duration | Histogram | ms | End-to-end request latency |
sdk.request.count | Counter | 1 | Total requests by method, status |
sdk.request.active | UpDownCounter | 1 | Currently in-flight requests |
sdk.retry.count | Counter | 1 | Total retries by error class |
sdk.circuit_breaker.state | Gauge | enum | Current circuit breaker state |
sdk.connection_pool.size | Gauge | 1 | Current pool size |
sdk.connection_pool.wait | Histogram | ms | Time waiting for connection |
sdk.context.tokens.allocated | Histogram | tokens | Token budget allocation |
sdk.context.compile.duration | Histogram | ms | Context compilation time |
sdk.tool.invoke.duration | Histogram | ms | Tool invocation latency |
sdk.memory.write.duration | Histogram | ms | Memory write latency |
sdk.telemetry.overhead | Gauge | % | CPU overhead from telemetry |
Overhead Budget Constraint:
where is typically 2–5%. When overhead exceeds the budget, the SDK activates adaptive sampling:
This dynamically reduces trace/metric emission rate to stay within the overhead budget while maintaining minimum observability.
5.12.4 Pseudo-Algorithm: Adaptive Telemetry Sampling#
ALGORITHM: AdaptiveSampler(config: SamplingConfig) → SamplingDecision
STATE:
window: SlidingWindow(size = 60s)
current_rate: float = config.initial_rate // e.g., 1.0 = 100%
overhead_ema: float = 0.0 // Exponential moving average
FUNCTION should_sample(span: SpanContext) → bool:
// Priority sampling: always sample errors and slow requests
IF span.has_error OR span.duration > config.slow_threshold:
RETURN true
// Always sample if trace is already being sampled (maintain trace integrity)
IF span.parent_sampled:
RETURN true
// Probabilistic sampling based on current rate
IF RANDOM() < current_rate:
RETURN true
RETURN false
FUNCTION update_rate():
// Called every adjustment_interval (e.g., 5 seconds)
measured_overhead := measure_telemetry_cpu_percent()
overhead_ema := 0.3 * measured_overhead + 0.7 * overhead_ema
IF overhead_ema > config.max_overhead:
current_rate := MAX(config.min_rate, current_rate * 0.8)
ELSE IF overhead_ema < config.max_overhead * 0.5:
current_rate := MIN(1.0, current_rate * 1.1)
EMIT metric: sdk.telemetry.sample_rate { value = current_rate }5.12.5 Error Reporting and Diagnostics#
The SDK provides structured error reports for debugging complex agentic failures:
TYPE SdkDiagnosticReport:
// Error context
error: SdkError
error_chain: List<SdkError> // Causal chain
request_summary: RequestSummary // Redacted request details
// Execution context
trace_id: TraceId
span_id: SpanId
agent_loop_state: LoopState? // Phase, iteration, step
// Environmental context
sdk_version: Version
transport: TransportType
connection_health: HealthStatus
circuit_breaker_state: CircuitState
pool_utilization: float
// Timing breakdown
timing: TimingBreakdown {
total_ms: float,
queue_wait_ms: float,
connection_acquire_ms: float,
serialization_ms: float,
network_ms: float,
deserialization_ms: float,
middleware_ms: Map<InterceptorName, float>
}
// Suggested actions
suggestions: List<DiagnosticSuggestion>Diagnostic Suggestion Engine:
ALGORITHM: GenerateDiagnosticSuggestions(report: SdkDiagnosticReport) → List<DiagnosticSuggestion>
1. suggestions := []
2. IF report.error.class = DeadlineExceeded:
IF report.timing.connection_acquire_ms > report.timing.total_ms * 0.5:
suggestions.append("Increase connection pool size or reduce concurrency")
IF report.timing.network_ms > report.timing.total_ms * 0.7:
suggestions.append("Server latency is dominant; consider increasing deadline or optimizing server-side processing")
IF report.timing.serialization_ms > 100:
suggestions.append("Large request payload; consider compression or chunked transmission")
3. IF report.error.class = RateLimited:
suggestions.append("Reduce request rate or increase rate limit quota")
suggestions.append("Enable request queuing with backpressure")
4. IF report.circuit_breaker_state = OPEN:
suggestions.append("Circuit breaker is open due to high failure rate; investigate server health")
suggestions.append("Check server-side logs for trace_id: " + report.trace_id)
5. IF report.pool_utilization > 0.9:
suggestions.append("Connection pool near exhaustion; consider scaling pool_max_size")
6. IF report.error_chain.length > 3:
suggestions.append("Deep error chain suggests cascading failure; review retry policy")
7. RETURN suggestions5.12.6 Performance Profiling Interface#
The SDK exposes a profiling interface for development-time performance analysis:
TYPE SdkProfiler:
FUNC start_profile(config: ProfileConfig) → ProfileSession
FUNC stop_profile(session: ProfileSession) → ProfileReport
TYPE ProfileReport:
duration: Duration
operations: List<ProfiledOperation>
hotspots: List<Hotspot> // Sorted by cumulative time
allocation_summary: AllocationSummary
network_summary: NetworkSummary
token_usage_summary: TokenUsageSummary
FUNC export(format: FLAMEGRAPH | CHROME_TRACE | PPROF) → bytesToken Cost Model:
The profiler tracks token usage as a first-class cost metric:
where are per-token costs for the model used in invocation , and are the input/output token counts respectively. The profiler aggregates this across operations, tools, and loop iterations to identify cost hotspots.
5.12.7 Telemetry Privacy and Redaction#
All telemetry passes through a redaction pipeline before export:
ALGORITHM: RedactTelemetry(item: TelemetryItem, policy: PrivacyPolicy) → TelemetryItem
1. // Redact known PII patterns
FOR EACH field IN item.string_fields():
FOR EACH pattern IN policy.pii_patterns:
field.value := regex_replace(field.value, pattern, "[REDACTED]")
2. // Remove content fields if policy forbids content export
IF policy.export_content = false:
item.remove_fields(["request.body", "response.body", "context.content"])
3. // Hash user identifiers for correlation without exposure
IF policy.hash_user_ids:
item.user_id := HMAC_SHA256(policy.salt, item.user_id)
4. // Enforce attribute allowlist
FOR EACH attr IN item.attributes:
IF attr.key NOT IN policy.allowed_attributes:
item.remove_attribute(attr.key)
5. RETURN itemChapter Summary#
This chapter has defined the complete SDK architecture for a universal agent client library system. The key architectural decisions and their justifications are:
| Decision | Justification |
|---|---|
| Layered stack (L0–L5) with strict upward-only dependencies | Enables transport substitution, independent testing, and code generation at the contract layer without cascading changes |
| Five core abstractions (AgentClient, ToolRegistry, MemoryStore, ContextBuilder, OrchestratorHandle) | Maps 1:1 to the platform's domain model; prevents abstraction leaks; enables independent evolution |
| Middleware as composed interceptor chain with ordering invariants | Ensures cross-cutting concerns are applied consistently; ordering constraint prevents correctness bugs (e.g., retry before auth) |
| Structured concurrency with scoped lifetimes | Prevents resource leaks, unobserved failures, and dangling tasks in concurrent agent workflows |
| Offline-first with sync-on-reconnect | Enables edge, mobile, IoT, and air-gapped deployments without sacrificing eventual consistency |
| Code generation from schema IR | Single source of truth for all contract types; eliminates drift between SDKs; enables cross-language conformance testing |
| Deterministic replay testing | Provides regression safety for SDK behavior; enables CI validation without live platform dependency |
| Adaptive telemetry with overhead budgets | Maintains observability under load without degrading application performance |
| Canonical error taxonomy with diagnostic suggestions | Transforms opaque failures into actionable guidance; reduces mean-time-to-resolution |
The SDK is not peripheral infrastructure—it is the typed enforcement boundary through which all application code interacts with the agentic platform. Its correctness, ergonomics, and reliability directly determine the correctness, ergonomics, and reliability of every agent built upon it.
End of Chapter 5.