Python SDK
Complete documentation for hanzo-zt, the async-first Python SDK for Hanzo ZT zero-trust networking with ZAP transport, IAM authentication, and billing enforcement.
Python SDK
The Python SDK (hanzo-zt) provides async-first bindings for Hanzo ZT. It uses asyncio throughout, dataclasses for configuration, and full type hints across the entire API surface.
Repository: github.com/hanzozt/zt-sdk-py
Installation
Install from PyPI:
pip install hanzo-ztOr with uv:
uv add hanzo-ztRequirements
- Python 3.10 or later
- asyncio (standard library)
httpxfor async HTTPpydanticfor validation (optional, used internally)
Package Structure
hanzozt/
__init__.py # Public API re-exports
config.py # Config dataclass, ConfigBuilder
context.py # ZtContext: auth, dial, listen, services
connection.py # ZtConnection: send/recv over ZT fabric
listener.py # ZtListener: accept incoming connections
errors.py # ZtError, AuthError, InsufficientBalanceError
auth/
__init__.py # Credentials protocol
hanzo.py # HanzoJwtCredentials (HANZO_API_KEY, auth.json)
api_key.py # ApiKeyCredentials (direct token)
zap/
transport.py # ZapTransport: 4-byte BE length-prefix framing
billing/
guard.py # BillingGuard: balance check, usage recording
types.py # UsageRecord dataclassQuick Start
A minimal asyncio example that authenticates, lists services, and dials an echo service:
import asyncio
from hanzozt import ZtContext, ConfigBuilder, HanzoJwtCredentials
async def main():
# Resolve credentials from HANZO_API_KEY env or ~/.hanzo/auth.json
creds = HanzoJwtCredentials.resolve()
config = (
ConfigBuilder()
.controller_url("https://zt-api.hanzo.ai")
.credentials(creds)
.billing(True)
.build()
)
ctx = await ZtContext.create(config)
await ctx.authenticate()
# List available services
services = await ctx.services()
for svc in services:
print(f"Service: {svc.name} ({svc.id})")
# Dial a service and exchange data
conn = await ctx.dial("echo-service")
await conn.send(b"Hello, ZT!")
response = await conn.recv()
print(f"Response: {response.decode()}")
await conn.close()
asyncio.run(main())Authentication
HanzoJwtCredentials
The primary credential type. Resolves a JWT from two sources, checked in order:
HANZO_API_KEYenvironment variable~/.hanzo/auth.jsonfile (written byhanzo login)
from hanzozt.auth.hanzo import HanzoJwtCredentials
# Automatic resolution (env var, then auth file)
creds = HanzoJwtCredentials.resolve()
# From an explicit token string
creds = HanzoJwtCredentials.from_token("eyJhbGciOiJSUzI1NiIs...")
# Inspect
print(creds.auth_method()) # "ext-jwt"
print(creds.auth_payload()) # {"ext-jwt": {"access_token": "eyJ..."}}
print(creds.display()) # "Hanzo API key (...xxxxx)" or "Hanzo IAM (user@example.com)"Credentials Protocol
All credential types follow the same protocol:
from typing import Protocol
class Credentials(Protocol):
def auth_method(self) -> str:
"""Returns the auth method identifier, e.g. 'ext-jwt' or 'password'."""
...
def auth_payload(self) -> dict:
"""Returns the authentication payload dict for the ZT controller."""
...
def display(self) -> str:
"""Human-readable display string with masked secrets."""
...ApiKeyCredentials
Alternative authentication with a direct controller API token:
from hanzozt.auth.api_key import ApiKeyCredentials
creds = ApiKeyCredentials("zt-controller-token")
assert creds.auth_method() == "password"Credential Resolution Order
When HanzoJwtCredentials.resolve() is called:
| Step | Source | Example |
|---|---|---|
| 1 | HANZO_API_KEY env var | export HANZO_API_KEY=sk-hz-... |
| 2 | ~/.hanzo/auth.json | Written by hanzo login CLI |
If neither source contains a valid token, resolve() raises AuthError.
ZAP Transport
The ZapTransport class wraps a connection with ZAP framing: each message is prefixed with a 4-byte big-endian length header.
Framing Protocol
+----------------+-------------------+
| Length (4B BE) | Payload (N bytes) |
+----------------+-------------------+Each frame is length-prefixed: 4 bytes encoding the payload size as an unsigned 32-bit big-endian integer, followed by exactly that many bytes of payload data.
Usage
from hanzozt.zap.transport import ZapTransport
# Wrap an existing connection with ZAP framing
transport = ZapTransport(connection)
# Send a frame (automatically prepends 4-byte length prefix)
await transport.send_frame(b'{"jsonrpc":"2.0","method":"tools/list"}')
# Receive a frame (reads length prefix, then exact payload bytes)
response: bytes = await transport.recv_frame()
print(response.decode())API Reference
| Method | Signature | Description |
|---|---|---|
send_frame | async send_frame(payload: bytes) -> None | Encode payload with 4-byte BE length prefix and send |
recv_frame | async recv_frame() -> bytes | Read 4-byte length prefix, then read exactly N payload bytes |
close | async close() -> None | Close the underlying connection |
is_connected | is_connected() -> bool | Check if the transport is still open |
Low-Level Framing Details
The framing functions can also be used standalone:
import struct
def encode_frame(payload: bytes) -> bytes:
"""Prepend 4-byte big-endian length prefix."""
return struct.pack(">I", len(payload)) + payload
async def read_frame(reader) -> bytes:
"""Read a length-prefixed frame from an async reader."""
header = await reader.readexactly(4)
length = struct.unpack(">I", header)[0]
return await reader.readexactly(length)Billing
BillingGuard
Enforces billing checks before service access and records usage after sessions.
from hanzozt.billing.guard import BillingGuard
guard = BillingGuard(
commerce_url="https://api.hanzo.ai/commerce",
token="your-jwt-token",
)
# Pre-dial balance check (called automatically by ZtContext.dial)
await guard.check_balance("my-service")
# Post-session usage recording
from hanzozt.billing.types import UsageRecord
await guard.record_usage(UsageRecord(
service="my-service",
session_id="sess_abc123",
bytes_sent=4096,
bytes_received=8192,
duration_ms=5000,
))If the account balance is zero or negative, check_balance raises InsufficientBalanceError.
UsageRecord
A dataclass that captures session metrics for billing:
from dataclasses import dataclass
@dataclass
class UsageRecord:
service: str # Service name
session_id: str # Session identifier
bytes_sent: int # Total bytes sent during session
bytes_received: int # Total bytes received during session
duration_ms: int # Session duration in millisecondsBillingGuard API
| Method | Signature | Description |
|---|---|---|
check_balance | async check_balance(service: str) -> None | Verify sufficient balance; raises InsufficientBalanceError if not |
record_usage | async record_usage(record: UsageRecord) -> None | Post usage metrics to Commerce API |
Configuration
Config Dataclass
from dataclasses import dataclass, field
from hanzozt.auth import Credentials
@dataclass
class Config:
controller_url: str # ZT controller URL (required)
credentials: Credentials # Auth credentials (required)
billing_enabled: bool = True # Enable billing checks
commerce_url: str = "https://api.hanzo.ai/commerce" # Commerce API URL
connect_timeout: float = 30.0 # Connection timeout in seconds
request_timeout: float = 15.0 # Controller API request timeout in secondsConfigBuilder
Fluent builder for constructing configuration:
from hanzozt import ConfigBuilder, HanzoJwtCredentials
config = (
ConfigBuilder()
.controller_url("https://zt-api.hanzo.ai")
.credentials(HanzoJwtCredentials.resolve())
.billing(True)
.commerce_url("https://api.hanzo.ai/commerce")
.connect_timeout(30.0)
.request_timeout(15.0)
.build()
)Config fields:
| Field | Type | Default | Description |
|---|---|---|---|
controller_url | str | Required | ZT controller URL |
credentials | Credentials | Required | Auth credentials |
billing_enabled | bool | True | Enable billing checks before dial |
commerce_url | str | https://api.hanzo.ai/commerce | Commerce API URL |
connect_timeout | float | 30.0 | Connection timeout (seconds) |
request_timeout | float | 15.0 | Controller API request timeout (seconds) |
Calling .build() raises InvalidConfigError if controller_url or credentials are missing.
ZtContext
The main entry point. Manages authentication, service discovery, and connections.
from hanzozt import ZtContext, ConfigBuilder, HanzoJwtCredentials
config = (
ConfigBuilder()
.controller_url("https://zt-api.hanzo.ai")
.credentials(HanzoJwtCredentials.resolve())
.build()
)
ctx = await ZtContext.create(config)
await ctx.authenticate()Methods:
| Method | Signature | Description |
|---|---|---|
create | async create(config: Config) -> ZtContext | Create a new context (class method) |
authenticate | async authenticate() -> None | Authenticate with the ZT controller |
dial | async dial(service: str) -> ZtConnection | Dial a service, returns a connection |
listen | async listen(service: str) -> ZtListener | Bind to a service, returns a listener |
services | async services() -> list[Service] | List all services available to the identity |
session | session() -> SessionInfo | Get the current API session info |
close | async close() -> None | Close the context and all connections |
ZtConnection
A bidirectional connection to a ZT service:
conn = await ctx.dial("my-service")
# Message-based API
await conn.send(b"hello")
response = await conn.recv()
# Metadata
print(conn.service_name) # "my-service"
print(conn.session_id) # "sess_abc123"
print(conn.is_connected) # True
# Close
await conn.close()ZtListener
Accepts incoming connections on a bound service:
listener = await ctx.listen("my-service")
while True:
conn = await listener.accept()
print(f"Accepted on {listener.service_name}")
# Handle in a separate task
asyncio.create_task(handle_connection(conn))
async def handle_connection(conn):
data = await conn.recv()
await conn.send(data) # Echo
await conn.close()Error Handling
All errors inherit from ZtError. Import from hanzozt.errors:
from hanzozt.errors import (
ZtError,
AuthError,
InsufficientBalanceError,
ServiceNotFoundError,
ConnectionClosedError,
InvalidConfigError,
ControllerError,
BillingError,
TimeoutError,
)Exception Hierarchy
ZtError (base)
AuthError -- Authentication failed or no credentials
InsufficientBalanceError -- Account balance is zero or negative
ServiceNotFoundError -- Named service does not exist
ConnectionClosedError -- Connection was closed unexpectedly
InvalidConfigError -- Configuration validation failed
ControllerError -- Controller API returned an error
BillingError -- Error contacting the Commerce API
TimeoutError -- Operation timed outExample
from hanzozt.errors import AuthError, InsufficientBalanceError, ServiceNotFoundError
try:
conn = await ctx.dial("my-service")
await conn.send(b"ping")
response = await conn.recv()
except AuthError as e:
print(f"Auth failed: {e}")
except InsufficientBalanceError as e:
print(f"Add credits: {e}")
except ServiceNotFoundError as e:
print(f"Service not found: {e}")
except ZtError as e:
print(f"ZT error: {e}")Complete Example
End-to-end: resolve credentials, configure, authenticate, dial a service over ZAP transport, exchange data, and record billing usage.
import asyncio
import time
from hanzozt import ZtContext, ConfigBuilder, HanzoJwtCredentials
from hanzozt.zap.transport import ZapTransport
from hanzozt.billing.guard import BillingGuard
from hanzozt.billing.types import UsageRecord
from hanzozt.errors import ZtError
async def main():
# 1. Resolve credentials
creds = HanzoJwtCredentials.resolve()
print(f"Auth: {creds.display()}")
# 2. Build configuration
config = (
ConfigBuilder()
.controller_url("https://zt-api.hanzo.ai")
.credentials(creds)
.billing(True)
.commerce_url("https://api.hanzo.ai/commerce")
.connect_timeout(30.0)
.build()
)
# 3. Create context and authenticate
ctx = await ZtContext.create(config)
await ctx.authenticate()
print(f"Session: {ctx.session().identity_id}")
# 4. List services
services = await ctx.services()
for svc in services:
print(f" {svc.name} ({svc.id})")
# 5. Dial a service
start = time.monotonic()
conn = await ctx.dial("echo-service")
# 6. Wrap with ZAP transport for length-prefixed framing
transport = ZapTransport(conn)
# 7. Exchange data
request = b'{"jsonrpc":"2.0","method":"ping","id":1}'
await transport.send_frame(request)
response = await transport.recv_frame()
print(f"Response: {response.decode()}")
# 8. Close connection
elapsed_ms = int((time.monotonic() - start) * 1000)
await transport.close()
# 9. Record usage for billing
guard = BillingGuard(
commerce_url="https://api.hanzo.ai/commerce",
token=creds.auth_payload()["ext-jwt"]["access_token"],
)
await guard.record_usage(UsageRecord(
service="echo-service",
session_id=conn.session_id,
bytes_sent=len(request),
bytes_received=len(response),
duration_ms=elapsed_ms,
))
# 10. Cleanup
await ctx.close()
print("Done.")
if __name__ == "__main__":
asyncio.run(main())Testing
The SDK ships with 11 tests covering configuration, authentication, transport framing, and billing. Run them with pytest:
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=hanzozt --cov-report=term-missingTest Breakdown
| Test | Area | Description |
|---|---|---|
test_config_builder_success | Config | Builder produces valid Config with all fields |
test_config_builder_missing_url | Config | Raises InvalidConfigError without controller_url |
test_config_builder_missing_creds | Config | Raises InvalidConfigError without credentials |
test_hanzo_jwt_resolve_env | Auth | Resolves token from HANZO_API_KEY env var |
test_hanzo_jwt_resolve_file | Auth | Resolves token from ~/.hanzo/auth.json |
test_hanzo_jwt_from_token | Auth | Constructs credentials from explicit token |
test_hanzo_jwt_auth_method | Auth | auth_method() returns "ext-jwt" |
test_hanzo_jwt_display_masked | Auth | Display masks all but last 5 characters |
test_zap_frame_roundtrip | Transport | Encode then decode preserves payload bytes |
test_billing_check_raises | Billing | check_balance raises InsufficientBalanceError on zero balance |
test_usage_record_dataclass | Billing | UsageRecord fields are correct types and values |
Example Test
import pytest
from hanzozt import ConfigBuilder, HanzoJwtCredentials
from hanzozt.errors import InvalidConfigError
def test_config_builder_success():
creds = HanzoJwtCredentials.from_token("test-token")
config = (
ConfigBuilder()
.controller_url("https://zt-api.hanzo.ai")
.credentials(creds)
.billing(False)
.build()
)
assert config.controller_url == "https://zt-api.hanzo.ai"
assert config.billing_enabled is False
def test_config_builder_missing_url():
creds = HanzoJwtCredentials.from_token("test-token")
with pytest.raises(InvalidConfigError):
ConfigBuilder().credentials(creds).build()
def test_hanzo_jwt_from_token():
creds = HanzoJwtCredentials.from_token("test-token-123")
assert creds.auth_method() == "ext-jwt"
assert "...n-123" in creds.display()
@pytest.mark.asyncio
async def test_zap_frame_roundtrip():
"""Verify that encoding then decoding a frame preserves the payload."""
import struct
payload = b"hello world"
frame = struct.pack(">I", len(payload)) + payload
# Decode
length = struct.unpack(">I", frame[:4])[0]
decoded = frame[4:4 + length]
assert decoded == payloadDependencies
| Package | Purpose |
|---|---|
httpx | Async HTTP client for controller and billing APIs |
pydantic | Optional validation for config and API responses |
The SDK relies on the Python standard library for asyncio, dataclasses, struct (ZAP framing), json, pathlib, and os.
Comparison with Other SDKs
| Feature | Python | Rust | Go | TypeScript |
|---|---|---|---|---|
| Async model | asyncio | Tokio | goroutines | Promises |
| Config style | dataclass | builder | struct | object literal |
| ZAP framing | struct.pack BE | bytes crate | encoding/binary | Buffer |
| HTTP client | httpx | reqwest | net/http | fetch |
| Error style | exception hierarchy | enum variants | error wrapping | typed errors |
| Tests | 11 | 9 | 5 | complete |
TypeScript SDK
Complete documentation for @hanzo/zt, the TypeScript zero-trust networking SDK with ZAP transport, async auth, and billing integration.
C++ SDK
Complete documentation for zt-sdk-cpp, a thin C++ wrapper around libzt with RAII lifecycle management, std::iostream-compatible connections, and Cap'n Proto ZAP transport.