Hanzo ZT

Python SDK

Complete documentation for hanzo-zt, the async-first Python SDK for Hanzo ZT zero-trust networking with ZAP transport, IAM authentication, and billing enforcement.

Python SDK

The Python SDK (hanzo-zt) provides async-first bindings for Hanzo ZT. It uses asyncio throughout, dataclasses for configuration, and full type hints across the entire API surface.

Repository: github.com/hanzozt/zt-sdk-py

Installation

Install from PyPI:

pip install hanzo-zt

Or with uv:

uv add hanzo-zt

Requirements

  • Python 3.10 or later
  • asyncio (standard library)
  • httpx for async HTTP
  • pydantic for validation (optional, used internally)

Package Structure

hanzozt/
  __init__.py          # Public API re-exports
  config.py            # Config dataclass, ConfigBuilder
  context.py           # ZtContext: auth, dial, listen, services
  connection.py        # ZtConnection: send/recv over ZT fabric
  listener.py          # ZtListener: accept incoming connections
  errors.py            # ZtError, AuthError, InsufficientBalanceError
  auth/
    __init__.py        # Credentials protocol
    hanzo.py           # HanzoJwtCredentials (HANZO_API_KEY, auth.json)
    api_key.py         # ApiKeyCredentials (direct token)
  zap/
    transport.py       # ZapTransport: 4-byte BE length-prefix framing
  billing/
    guard.py           # BillingGuard: balance check, usage recording
    types.py           # UsageRecord dataclass

Quick Start

A minimal asyncio example that authenticates, lists services, and dials an echo service:

import asyncio
from hanzozt import ZtContext, ConfigBuilder, HanzoJwtCredentials

async def main():
    # Resolve credentials from HANZO_API_KEY env or ~/.hanzo/auth.json
    creds = HanzoJwtCredentials.resolve()

    config = (
        ConfigBuilder()
        .controller_url("https://zt-api.hanzo.ai")
        .credentials(creds)
        .billing(True)
        .build()
    )

    ctx = await ZtContext.create(config)
    await ctx.authenticate()

    # List available services
    services = await ctx.services()
    for svc in services:
        print(f"Service: {svc.name} ({svc.id})")

    # Dial a service and exchange data
    conn = await ctx.dial("echo-service")
    await conn.send(b"Hello, ZT!")
    response = await conn.recv()
    print(f"Response: {response.decode()}")
    await conn.close()

asyncio.run(main())

Authentication

HanzoJwtCredentials

The primary credential type. Resolves a JWT from two sources, checked in order:

  1. HANZO_API_KEY environment variable
  2. ~/.hanzo/auth.json file (written by hanzo login)
from hanzozt.auth.hanzo import HanzoJwtCredentials

# Automatic resolution (env var, then auth file)
creds = HanzoJwtCredentials.resolve()

# From an explicit token string
creds = HanzoJwtCredentials.from_token("eyJhbGciOiJSUzI1NiIs...")

# Inspect
print(creds.auth_method())   # "ext-jwt"
print(creds.auth_payload())  # {"ext-jwt": {"access_token": "eyJ..."}}
print(creds.display())       # "Hanzo API key (...xxxxx)" or "Hanzo IAM (user@example.com)"

Credentials Protocol

All credential types follow the same protocol:

from typing import Protocol

class Credentials(Protocol):
    def auth_method(self) -> str:
        """Returns the auth method identifier, e.g. 'ext-jwt' or 'password'."""
        ...

    def auth_payload(self) -> dict:
        """Returns the authentication payload dict for the ZT controller."""
        ...

    def display(self) -> str:
        """Human-readable display string with masked secrets."""
        ...

ApiKeyCredentials

Alternative authentication with a direct controller API token:

from hanzozt.auth.api_key import ApiKeyCredentials

creds = ApiKeyCredentials("zt-controller-token")
assert creds.auth_method() == "password"

Credential Resolution Order

When HanzoJwtCredentials.resolve() is called:

StepSourceExample
1HANZO_API_KEY env varexport HANZO_API_KEY=sk-hz-...
2~/.hanzo/auth.jsonWritten by hanzo login CLI

If neither source contains a valid token, resolve() raises AuthError.

ZAP Transport

The ZapTransport class wraps a connection with ZAP framing: each message is prefixed with a 4-byte big-endian length header.

Framing Protocol

+----------------+-------------------+
| Length (4B BE) | Payload (N bytes) |
+----------------+-------------------+

Each frame is length-prefixed: 4 bytes encoding the payload size as an unsigned 32-bit big-endian integer, followed by exactly that many bytes of payload data.

Usage

from hanzozt.zap.transport import ZapTransport

# Wrap an existing connection with ZAP framing
transport = ZapTransport(connection)

# Send a frame (automatically prepends 4-byte length prefix)
await transport.send_frame(b'{"jsonrpc":"2.0","method":"tools/list"}')

# Receive a frame (reads length prefix, then exact payload bytes)
response: bytes = await transport.recv_frame()
print(response.decode())

API Reference

MethodSignatureDescription
send_frameasync send_frame(payload: bytes) -> NoneEncode payload with 4-byte BE length prefix and send
recv_frameasync recv_frame() -> bytesRead 4-byte length prefix, then read exactly N payload bytes
closeasync close() -> NoneClose the underlying connection
is_connectedis_connected() -> boolCheck if the transport is still open

Low-Level Framing Details

The framing functions can also be used standalone:

import struct

def encode_frame(payload: bytes) -> bytes:
    """Prepend 4-byte big-endian length prefix."""
    return struct.pack(">I", len(payload)) + payload

async def read_frame(reader) -> bytes:
    """Read a length-prefixed frame from an async reader."""
    header = await reader.readexactly(4)
    length = struct.unpack(">I", header)[0]
    return await reader.readexactly(length)

Billing

BillingGuard

Enforces billing checks before service access and records usage after sessions.

from hanzozt.billing.guard import BillingGuard

guard = BillingGuard(
    commerce_url="https://api.hanzo.ai/commerce",
    token="your-jwt-token",
)

# Pre-dial balance check (called automatically by ZtContext.dial)
await guard.check_balance("my-service")

# Post-session usage recording
from hanzozt.billing.types import UsageRecord

await guard.record_usage(UsageRecord(
    service="my-service",
    session_id="sess_abc123",
    bytes_sent=4096,
    bytes_received=8192,
    duration_ms=5000,
))

If the account balance is zero or negative, check_balance raises InsufficientBalanceError.

UsageRecord

A dataclass that captures session metrics for billing:

from dataclasses import dataclass

@dataclass
class UsageRecord:
    service: str         # Service name
    session_id: str      # Session identifier
    bytes_sent: int      # Total bytes sent during session
    bytes_received: int  # Total bytes received during session
    duration_ms: int     # Session duration in milliseconds

BillingGuard API

MethodSignatureDescription
check_balanceasync check_balance(service: str) -> NoneVerify sufficient balance; raises InsufficientBalanceError if not
record_usageasync record_usage(record: UsageRecord) -> NonePost usage metrics to Commerce API

Configuration

Config Dataclass

from dataclasses import dataclass, field
from hanzozt.auth import Credentials

@dataclass
class Config:
    controller_url: str                        # ZT controller URL (required)
    credentials: Credentials                   # Auth credentials (required)
    billing_enabled: bool = True               # Enable billing checks
    commerce_url: str = "https://api.hanzo.ai/commerce"  # Commerce API URL
    connect_timeout: float = 30.0              # Connection timeout in seconds
    request_timeout: float = 15.0              # Controller API request timeout in seconds

ConfigBuilder

Fluent builder for constructing configuration:

from hanzozt import ConfigBuilder, HanzoJwtCredentials

config = (
    ConfigBuilder()
    .controller_url("https://zt-api.hanzo.ai")
    .credentials(HanzoJwtCredentials.resolve())
    .billing(True)
    .commerce_url("https://api.hanzo.ai/commerce")
    .connect_timeout(30.0)
    .request_timeout(15.0)
    .build()
)

Config fields:

FieldTypeDefaultDescription
controller_urlstrRequiredZT controller URL
credentialsCredentialsRequiredAuth credentials
billing_enabledboolTrueEnable billing checks before dial
commerce_urlstrhttps://api.hanzo.ai/commerceCommerce API URL
connect_timeoutfloat30.0Connection timeout (seconds)
request_timeoutfloat15.0Controller API request timeout (seconds)

Calling .build() raises InvalidConfigError if controller_url or credentials are missing.

ZtContext

The main entry point. Manages authentication, service discovery, and connections.

from hanzozt import ZtContext, ConfigBuilder, HanzoJwtCredentials

config = (
    ConfigBuilder()
    .controller_url("https://zt-api.hanzo.ai")
    .credentials(HanzoJwtCredentials.resolve())
    .build()
)

ctx = await ZtContext.create(config)
await ctx.authenticate()

Methods:

MethodSignatureDescription
createasync create(config: Config) -> ZtContextCreate a new context (class method)
authenticateasync authenticate() -> NoneAuthenticate with the ZT controller
dialasync dial(service: str) -> ZtConnectionDial a service, returns a connection
listenasync listen(service: str) -> ZtListenerBind to a service, returns a listener
servicesasync services() -> list[Service]List all services available to the identity
sessionsession() -> SessionInfoGet the current API session info
closeasync close() -> NoneClose the context and all connections

ZtConnection

A bidirectional connection to a ZT service:

conn = await ctx.dial("my-service")

# Message-based API
await conn.send(b"hello")
response = await conn.recv()

# Metadata
print(conn.service_name)  # "my-service"
print(conn.session_id)    # "sess_abc123"
print(conn.is_connected)  # True

# Close
await conn.close()

ZtListener

Accepts incoming connections on a bound service:

listener = await ctx.listen("my-service")

while True:
    conn = await listener.accept()
    print(f"Accepted on {listener.service_name}")

    # Handle in a separate task
    asyncio.create_task(handle_connection(conn))

async def handle_connection(conn):
    data = await conn.recv()
    await conn.send(data)  # Echo
    await conn.close()

Error Handling

All errors inherit from ZtError. Import from hanzozt.errors:

from hanzozt.errors import (
    ZtError,
    AuthError,
    InsufficientBalanceError,
    ServiceNotFoundError,
    ConnectionClosedError,
    InvalidConfigError,
    ControllerError,
    BillingError,
    TimeoutError,
)

Exception Hierarchy

ZtError (base)
  AuthError                  -- Authentication failed or no credentials
  InsufficientBalanceError   -- Account balance is zero or negative
  ServiceNotFoundError       -- Named service does not exist
  ConnectionClosedError      -- Connection was closed unexpectedly
  InvalidConfigError         -- Configuration validation failed
  ControllerError            -- Controller API returned an error
  BillingError               -- Error contacting the Commerce API
  TimeoutError               -- Operation timed out

Example

from hanzozt.errors import AuthError, InsufficientBalanceError, ServiceNotFoundError

try:
    conn = await ctx.dial("my-service")
    await conn.send(b"ping")
    response = await conn.recv()
except AuthError as e:
    print(f"Auth failed: {e}")
except InsufficientBalanceError as e:
    print(f"Add credits: {e}")
except ServiceNotFoundError as e:
    print(f"Service not found: {e}")
except ZtError as e:
    print(f"ZT error: {e}")

Complete Example

End-to-end: resolve credentials, configure, authenticate, dial a service over ZAP transport, exchange data, and record billing usage.

import asyncio
import time
from hanzozt import ZtContext, ConfigBuilder, HanzoJwtCredentials
from hanzozt.zap.transport import ZapTransport
from hanzozt.billing.guard import BillingGuard
from hanzozt.billing.types import UsageRecord
from hanzozt.errors import ZtError

async def main():
    # 1. Resolve credentials
    creds = HanzoJwtCredentials.resolve()
    print(f"Auth: {creds.display()}")

    # 2. Build configuration
    config = (
        ConfigBuilder()
        .controller_url("https://zt-api.hanzo.ai")
        .credentials(creds)
        .billing(True)
        .commerce_url("https://api.hanzo.ai/commerce")
        .connect_timeout(30.0)
        .build()
    )

    # 3. Create context and authenticate
    ctx = await ZtContext.create(config)
    await ctx.authenticate()
    print(f"Session: {ctx.session().identity_id}")

    # 4. List services
    services = await ctx.services()
    for svc in services:
        print(f"  {svc.name} ({svc.id})")

    # 5. Dial a service
    start = time.monotonic()
    conn = await ctx.dial("echo-service")

    # 6. Wrap with ZAP transport for length-prefixed framing
    transport = ZapTransport(conn)

    # 7. Exchange data
    request = b'{"jsonrpc":"2.0","method":"ping","id":1}'
    await transport.send_frame(request)
    response = await transport.recv_frame()
    print(f"Response: {response.decode()}")

    # 8. Close connection
    elapsed_ms = int((time.monotonic() - start) * 1000)
    await transport.close()

    # 9. Record usage for billing
    guard = BillingGuard(
        commerce_url="https://api.hanzo.ai/commerce",
        token=creds.auth_payload()["ext-jwt"]["access_token"],
    )
    await guard.record_usage(UsageRecord(
        service="echo-service",
        session_id=conn.session_id,
        bytes_sent=len(request),
        bytes_received=len(response),
        duration_ms=elapsed_ms,
    ))

    # 10. Cleanup
    await ctx.close()
    print("Done.")

if __name__ == "__main__":
    asyncio.run(main())

Testing

The SDK ships with 11 tests covering configuration, authentication, transport framing, and billing. Run them with pytest:

# Run all tests
pytest tests/ -v

# Run with coverage
pytest tests/ --cov=hanzozt --cov-report=term-missing

Test Breakdown

TestAreaDescription
test_config_builder_successConfigBuilder produces valid Config with all fields
test_config_builder_missing_urlConfigRaises InvalidConfigError without controller_url
test_config_builder_missing_credsConfigRaises InvalidConfigError without credentials
test_hanzo_jwt_resolve_envAuthResolves token from HANZO_API_KEY env var
test_hanzo_jwt_resolve_fileAuthResolves token from ~/.hanzo/auth.json
test_hanzo_jwt_from_tokenAuthConstructs credentials from explicit token
test_hanzo_jwt_auth_methodAuthauth_method() returns "ext-jwt"
test_hanzo_jwt_display_maskedAuthDisplay masks all but last 5 characters
test_zap_frame_roundtripTransportEncode then decode preserves payload bytes
test_billing_check_raisesBillingcheck_balance raises InsufficientBalanceError on zero balance
test_usage_record_dataclassBillingUsageRecord fields are correct types and values

Example Test

import pytest
from hanzozt import ConfigBuilder, HanzoJwtCredentials
from hanzozt.errors import InvalidConfigError

def test_config_builder_success():
    creds = HanzoJwtCredentials.from_token("test-token")
    config = (
        ConfigBuilder()
        .controller_url("https://zt-api.hanzo.ai")
        .credentials(creds)
        .billing(False)
        .build()
    )
    assert config.controller_url == "https://zt-api.hanzo.ai"
    assert config.billing_enabled is False

def test_config_builder_missing_url():
    creds = HanzoJwtCredentials.from_token("test-token")
    with pytest.raises(InvalidConfigError):
        ConfigBuilder().credentials(creds).build()

def test_hanzo_jwt_from_token():
    creds = HanzoJwtCredentials.from_token("test-token-123")
    assert creds.auth_method() == "ext-jwt"
    assert "...n-123" in creds.display()

@pytest.mark.asyncio
async def test_zap_frame_roundtrip():
    """Verify that encoding then decoding a frame preserves the payload."""
    import struct
    payload = b"hello world"
    frame = struct.pack(">I", len(payload)) + payload

    # Decode
    length = struct.unpack(">I", frame[:4])[0]
    decoded = frame[4:4 + length]
    assert decoded == payload

Dependencies

PackagePurpose
httpxAsync HTTP client for controller and billing APIs
pydanticOptional validation for config and API responses

The SDK relies on the Python standard library for asyncio, dataclasses, struct (ZAP framing), json, pathlib, and os.

Comparison with Other SDKs

FeaturePythonRustGoTypeScript
Async modelasyncioTokiogoroutinesPromises
Config styledataclassbuilderstructobject literal
ZAP framingstruct.pack BEbytes crateencoding/binaryBuffer
HTTP clienthttpxreqwestnet/httpfetch
Error styleexception hierarchyenum variantserror wrappingtyped errors
Tests1195complete

On this page