Skip to content

Python SDK Reference

This section is generated from the installed zelos_sdk package.

Use the sidebar to browse modules, or jump directly:

Package Overview

zelos_sdk

ActionInfo

Identifier carried by every entry from [PyAgentActions::list].

Currently exposes a single name field; reserved as a pyclass (rather than list[str]) so future fields (description, category, …) can ship without breaking for info in agent.actions.list(): info.name notebooks.

ActionResult

Result of agent.actions.execute(...) — parsed value plus non-optional [PyActionStatus].

Missing proto status maps to [PyActionStatus::Done]. Fail/error statuses are returned, not raised.

ActionSchema

Schema from agent.actions.schema(...) with pre-parsed JSON objects.

Schema blobs (action_schema, ui_schema) are pre-parsed dicts.

ActionStatus

Typed action execution status (proto integers 0=PASS, 1=FAIL, 2=ERROR, 3=DONE).

Uses the classattr pattern (like PyExtensionState) because pyo3-stub-gen 0.6.2 panics on stub generation for pyclass enum.

Agent

Rust-side Agent facade: owns the catalog cache and delegates RPCs to AgentTransport.

__del__()

Best-effort cleanup when the handle is GC'd without close() / with. close() is idempotent, so explicit cleanup still wins.

at(paths, time, *, min_time=..., producers=..., lookback=...)

Per-signal value at (or before) a specific timestamp.

Returns a dict mapping each resolved signal's path to its most recent LatestValue whose timestamp is <= time. time and min_time accept datetime, ISO / relative string ("-1m", "now"), or raw int epoch ns. Use min_time to floor the search. producers defaults to ("localhost",).

Args: paths: A str, Signal, or sequence thereof. time: Anchor timestamp (datetime, str, or epoch ns). min_time: Optional lower bound; values strictly older than this are skipped. producers: Producer addresses; pass () to fan out across all. lookback: Catalog cache freshness in seconds (default 60).

Returns: dict[str, LatestValue]: One entry per matched signal, keyed by path.

Raises: SignalNotFound: a wildcard expanded to zero matches. ValueError: min_time > time.

close()

Close the transport and drop the catalog cache.

Idempotent. Subsequent RPCs on this Agent (or any Trace/TraceSet derived from it) raise AgentCancelled. Prefer with Agent.connect().

close_traces(paths=...)

Release one or more open trace files in a single RPC. Empty (default) closes all.

connect(target=..., timeout=...) staticmethod

Connect to an agent and return a handle.

export(path, *, start, end=..., paths=..., producers=..., lookback=..., overwrite=...)

Export signal data to a .trz file on disk.

Resolves paths against the live catalog and asks each owning producer to write its slice of the time window into the same output file. When paths is None (default) every signal in the live catalog is exported. The agent does the actual writing; on a localhost target with localhost producers an atomic tmp-rename strategy guards against partial files when overwrite=True and the destination already exists.

Cross-host targets reject overwrite=False outright (the gate has no way to atomically replace a file on a remote host); pass overwrite=True to opt in.

Args: path: Filesystem destination for the .trz file. start: Start of the time window. end: End of the window; defaults to "now". paths: Optional str, Signal, or sequence; None (default) exports the full catalog. producers: Producer addresses; defaults to ("localhost",). Pass () to fan out across all. lookback: Catalog cache freshness in seconds (default 60). overwrite: When True, replace an existing file. Required for cross-host writes regardless of whether the path exists.

Returns: ExportResult: On-disk path plus per-producer success bits. result.ok is only True if every producer succeeded.

Raises: ValueError: overwrite=False on a cross-host target. FileExistsError: overwrite=False and the destination already exists.

health()

Alias for [health_check].

health_check()

Round-trip Health RPC; returns the agent's status string (typically "SERVING").

info()

Composite identity + settings + memory + dirs.

Health-check gates; the four sub-RPCs (settings/memory/logs/configs) fan out concurrently and partial failures land in info.failures rather than raising.

latest(paths, *, lookback=..., producers=...)

Latest value(s): one path → LatestValue; sequence → dict[str, LatestValue] (missing keys omitted).

Single path: no catalog match → SignalNotFound; match but no rows in lookback → NoData; ambiguous pattern → AmbiguousSignal. Sequence: per-key ambiguity raises; misses omit keys. producers defaults to ("localhost",).

open_trace(path)

Open a trace file. Returns a Trace handle sharing this agent's transport.

Raises TraceNotFound if the agent doesn't have the file. Call trace.close() (or use the agent context manager) to release.

open_traces(paths)

Open multiple traces as one queryable set; routes per-signal to the owning file.

Raises ValueError on empty paths, TraceNotFound if any file is unknown.

query(paths, *, start, end=..., producers=..., lookback=..., max_rows=..., sort=..., sort_order=..., downsample=...)

Time-bounded query → SignalFrame (Arrow IPC). Empty frame when no rows match.

paths accepts a str, Signal, or sequence. start/end accept datetime, ISO 8601, or relative strings ("-1m", "now"). downsample = N runs the agent's M4 strategy with N buckets and is mutually exclusive with max_rows / sort_order. max_rows = 0 means no cap. producers defaults to ("localhost",). Convert via frame.to_pandas() or frame.series(name).

resolve_paths(paths, *, producers=..., lookback=...)

Expand paths against the live catalog and return the clean canonical paths.

Wildcards expand to every match; Signal objects flatten to their path. Useful for previewing what a query would match. Raises SignalNotFound when a wildcard has no match. producers defaults to ("localhost",).

segments(producers=...)

Live data segments. producers defaults to ("localhost",); pass () to fan out to every connected producer.

Per-producer warnings (an unreachable producer, a stale segment) are emitted as RuntimeWarning so partial results never silently hide failures. Catch with warnings.catch_warnings() to inspect them.

signals(producers=..., lookback=...)

Cached signal catalog. producers defaults to ("localhost",); pass () to fan out to every connected producer.

watch(paths=..., *, interval=..., lookback=..., producers=..., until=..., on_error=..., max_consecutive_errors=...)

Delegates to zelos_sdk.agent.watch._watch_impl (generator loop stays in Python). producers defaults to ("localhost",).

window(paths, *, start, duration, producers=..., lookback=..., display_fps=...)

Replay snapshot + change-stream over a fixed time window.

Returns a ReplayWindow with two parts: the snapshot (latest value per signal at start) and the changes (every subsequent update inside the window). start accepts datetime, str, or int ns; duration accepts int ns, float seconds, or datetime.timedelta. producers defaults to ("localhost",). display_fps = 0 means no thinning.

Args: paths: A str, Signal, or sequence thereof. start: Window start. duration: Window length. producers: Producer addresses; pass () to fan out across all. lookback: Catalog cache freshness in seconds (default 60). display_fps: Optional display-rate cap on the change stream.

Returns: ReplayWindow: Snapshot + changes for the window.

Raises: SignalNotFound: a wildcard expanded to zero matches. ValueError: duration <= 0 or display_fps < 0.

AgentActions

Agent action API (Agent.actions): list, schema, execute over gRPC (no caching).

Callers pass Python dicts; Rust handles JSON at the wire boundary.

Example: for info in agent.actions.list(): print(info.name) schema = agent.actions.schema("battery_test") result = agent.actions.execute("battery_test", params={"target_soc": 80})

execute(action, params=..., timeout=...)

Run action with optional params. Status lives on ActionResult, not exceptions.

Args: timeout: Seconds; None uses the agent default (typically 30 s).

list()

Registered actions from actions_list (order as on the agent; not cached).

Returns: list[ActionInfo]: One entry per action, with at least a .name field.

Raises: AgentUnavailable: agent is unreachable.

schema(action, current_values=...)

JSON-Schema and ui-schema for action, pre-parsed as dicts.

Args: current_values: Partial form data for dynamic schemas; None omits from the wire.

AgentExtensions

Sub-namespace exposing the agent's extension lifecycle API.

Reached via Agent.extensions. The handle holds a transport clone plus a shared handle to the parent agent's signals catalog cache — start(), stop(), and restart() invalidate that cache so the next agent.signals() call sees any new producers an extension brought online (or saw a producer go away).

list() enumerates installed extensions; info(id) and readme(id) return descriptive metadata; config_schema(id) and last_config(id) return the JSON-Schema and saved config (decoded to dict). start(id, config={...}) and restart(id, config={...}) launch (or relaunch) the extension and return a typed ExtensionStart carrying the resolved version + pid.

Example: entries = agent.extensions.list() started = agent.extensions.start("zeloscloud.zelos-extension-can", config={"buses": [{"interface": "demo"}]}) agent.extensions.stop("zeloscloud.zelos-extension-can")

config_schema(id, version=...)

Returns the extension's JSON-Schema config, decoded into a dict. None means "no schema declared"; an empty dict means "configured but with no fields" (proto schema_json = "{}").

info(id, version=...)

Fetch metadata for a specific extension.

Returns the extension's manifest data: id, version, description, any actions/signals it provides, and so on. Useful for building catalog UIs or verifying an extension is installed before start()-ing it.

Args: id: Extension identifier (e.g. "zeloscloud.zelos-extension-can"). version: Specific version to query; None resolves to the currently-installed version.

Returns: ExtensionInfo | None: Manifest, or None if the extension is not installed.

last_config(id, version=...)

Returns the last saved config, decoded into a dict. None means "never configured"; an empty dict means "configured but with no fields" (proto config_json = "{}").

list()

List every extension installed in the agent.

The returned ExtensionEntry objects carry id, version, current ExtensionState (Installed or Running), and pid (when running). The list is not cached, so it always reflects the agent's current view.

Returns: list[ExtensionEntry]: One entry per installed extension.

Raises: AgentUnavailable: agent is unreachable.

readme(id, version=...)

Fetch the extension's README markdown.

Useful for building marketplace UIs or showing inline help in a notebook. Returns the raw markdown verbatim, or an empty string when the extension shipped no README — render it directly with whatever tooling fits the surface.

Args: id: Extension identifier. version: Specific version; None resolves to the currently-installed version.

Returns: str: Markdown text, or "" when the extension shipped no README.

restart(id, config=...)

See [Self::start] for config = None vs config = {} semantics.

start(id, config=...)

config = None means "no config" (transport sends nothing on the wire). config = {} means "empty config" (transport sends "{}" on the wire). The returned [PyExtensionStart] carries the caller-known id plus the resolved version straight from the agent's StartReply — no follow-up RPC required.

stop(id)

Stop a running extension.

Asks the agent to terminate the extension process; the signals-catalog cache is invalidated so the next agent.signals() call reflects the producer leaving. Idempotent — stopping an already-stopped extension is a no-op.

Args: id: Extension identifier.

Raises: ExtensionError: agent reported an error tearing down the extension process.

AgentInfo

Composite agent identity and runtime state.

Returned by Agent.info(). Bundles the connection target, latest health-check result, and best-effort introspection: the agent's AgentSettings, current memory usage, and the on-disk paths the agent uses for logs and configs.

info() runs four optional sub-RPCs concurrently behind the scenes (settings + memory + logs-dir + configs-dir); each of them may fail independently of the others. Failed sub-RPC names appear in failures (a stable list of strings — "SettingsGet", "SystemGetMemory", "SystemGetDirLogs", "SystemGetDirConfigs" — useful for grep-friendly notebook checks). Only the gating health-check is fatal; everything else is fail-soft.

Example: info = agent.info() print(info.target, info.health, info.memory_bytes) if info.failures: print("partial info:", info.failures)

AgentSettings

Raw settings snapshot from SettingsGet. Wire fields cross verbatim; the Python AgentSettings dataclass in agent/info.py re-exports the same data (memory/disk strings stay strings — they are human-readable like "25%" / "10GB" and are parsed against actual host capacity at runtime).

Plot UX defaults and subscription / version-migration fields are omitted (not part of the SDK's public surface).

memory_limit / disk_limit are strings (e.g. "25%", "10GB"), not u64 — the agent stores human-readable strings and parses them against actual host capacity at runtime.

ColumnMetadata

Per-column metadata attached to a query result.

Exposed verbatim — the renderer's ts-rs JSON view and the Python view share field names so notebook code matches the desktop UI. signal keeps its wire name (not name) because name collides with __name__-style lookups when used as a pandas/pyarrow column header.

DataSegment

Python wrapper for a data segment.

Represents a segment of trace data with metadata about its time range and producer.

ExitInfo

Exit information from a terminated extension process.

Both fields are optional because some platforms only report one (POSIX signal vs. exit code).

ExportProducerResult

Per-producer result from live_export_trace_multi. The producer identity is the parent results map key, stamped at the transport boundary.

ExportResult

Result of Agent.export(...) / Trace.export(...).

Bundles the on-disk path the agent (or local tmp-rename strategy) wrote to with a results dict keyed by producer address. Each entry is an ExportProducerResult carrying that producer's per-shard success bit and any error message.

ok is True only when every producer shard succeeded — partial failures surface as ok=False with per-producer detail in results rather than raising. This lets callers branch on partial-success outcomes without a try/except.

Example: result = agent.export("bms.trz", start="-1m", paths=["can/Battery.*"]) if not result.ok: for producer, r in result.results.items(): if not r.ok: print(f"{producer}: {r.error}")

ExtensionEntry

Catalog row returned by agent.extensions.list().

Python class is named ExtensionEntry, matching the Rust DTO and proto. host_type and app_contribution_kind are exposed as their canonical lowercase wire strings; the #[new] constructor validates them.

ExtensionInfo

Rich info returned by agent.extensions.info(id).

Compared to ExtensionEntry, this carries install_path, readme_path, etc. but not author or last_exit (those are list-time concerns).

ExtensionStart

Result of agent.extensions.start(id, ...) / restart(id, ...).

Built from the wire StartReply by transport::extension_start_from_reply, which stamps the caller-known id alongside the proto-supplied pid / version — no follow-up extensions_list() is needed.

ExtensionState

Runtime state of an extension.

Two-variant enum-like class. Wire form is the lowercase string ("installed" / "running"); access via state.value.

Uses the classattr pattern (like PyDataType) rather than a pyclass enum because pyo3-stub-gen 0.6.2 panics on stub generation for pyclass enum. Switch when upstream is upgraded.

Internal

Bases: RuntimeError

SDK invariant failure (Python decode bug, not an AgentError).

LatestValue

One row from latest() / at().

Optional Arrow bytes plus a typed scalar; decoding uses catalog metadata when signal is known. Clone only bumps refcounts / copies small buffers (no GIL).

formatted(digits=...)

Display string: enum label, numeric+unit, bool, hex bytes, em-dash for null, etc.

get()

Checkable contract — typed value, mirrors the value property.

NamedScalar

Bases: float

float carrying a signal name (Checkable contract).

QueryResult

Python wrapper for query results.

Contains the results of a trace data query, including field names, the raw Arrow data as Python bytes, and the SQL query that was executed.

to_arrow()

Convert the Arrow data to a Python object that can be read by PyArrow.

Returns: bytes: Arrow IPC stream data

Examples: >>> import pyarrow as pa >>> result = reader.query(...) >>> arrow_bytes = result.to_arrow() >>> reader = pa.ipc.open_stream(arrow_bytes) >>> table = reader.read_all()

QueryType

Raw / M4 — selects the agent-side query strategy. Wire encoding mirrors the proto's QueryType enum (Raw=0, M4=1); the proto's MinMax=2 is internal-only and not exposed through the SDK surface.

ReplayWindow

Replay-window response: I-frame snapshot + P-frame changes + window bounds. Rows use the same typed LatestValue shape as latest() and at().

Segment

Metadata for a single data segment.

Both bounds are optional because a segment that is still recording does not have a final end time. connection is set for live segments, trace_path for trace-file segments — they're mutually exclusive in practice.

Signal

Selector + metadata for a single signal.

The Rust field signal is exposed as the Pythonic name (signal.signal is awkward in Python); data_segment_id is exposed as a string.

SignalCatalog

Immutable signal catalog with native search and exact-path lookup.

by_path(path)

Look up a signal by its exact canonical path.

Args: path: Canonical "{source}/{message}.{name}" path.

Returns: Signal: The unique catalog entry with that path.

Raises: SignalNotFound: no catalog entry has that path. AmbiguousSignal: multiple entries share the path (e.g. across producers); pass a Signal to the API directly to disambiguate.

search(query)

Filter the catalog by case-insensitive substring match.

Each signal is checked across five fields: full path, source, message, signal name, and unit. Matching is case-insensitive; the empty query returns the catalog unchanged.

Args: query: Substring to search for.

Returns: SignalCatalog: A new catalog containing only the matched signals; warnings are inherited from the parent.

to_list()

Materialize the catalog into a flat list[Signal].

Useful for passing the full catalog to a function that wants a concrete list (rather than the lazy iterator the catalog itself yields). The returned list is a fresh copy; mutating it does not affect the catalog.

Returns: list[Signal]: All signals in catalog order.

to_pandas()

Render the catalog as a pandas DataFrame (one row per signal).

Columns: path, source, message, name, data_type, unit, producer, trace_path, data_segment_id. Raises ImportError when pandas is not installed.

SignalFrame

Result of a tabular query.

Arrow IPC bytes cross verbatim and are decoded by to_arrow(). arrow_ipc_data is held as Arc<[u8]> so Clone is cheap on the data path; normalized_table_cache resets to empty on clone (it's a perf aid, not identity, and sliced frames need a fresh table anyway).

__contains__(name)

path in frame membership test against the non-time column names.

__getitem__(key)

frame[key] — alias for frame.series(key). Accepts a str or Signal.

__iter__()

Iterating a SignalFrame yields the resolved catalog Signal for each non-time column, in column order. Use frame.values() for SignalSeries objects.

head(n=...)

First n rows as a new SignalFrame (default 5).

items()

(name, series) tuples for every non-time column.

keys()

Column names excluding the leading time column. Pandas-style iteration.

series(key)

Project one column as a Python SignalSeries (pandas/pyarrow ergonomics).

key accepts a str (clean path or actual Arrow column name) or a Signal from the frame's catalog. Strings are matched against resolved signal paths and against the Arrow schema directly; ties across producers / segments raise [errors::AmbiguousSignal] with a hint to pass a Signal.

tail(n=...)

Last n rows as a new SignalFrame (default 5).

to_arrow()

Decode and normalize the Arrow IPC payload at the Rust/PyO3 boundary.

The returned object is a pyarrow.Table with a canonical time: timestamp[ns, tz=UTC] column. The name mirrors SignalSeries.to_arrow() (a column accessor) so notebook users have one symbol to reach for at both the frame and column level.

Cached after the first call (see normalized_table_cache); subsequent calls return a fresh Py handle to the same underlying Table via clone_ref. Callers that mutate the returned Table get their own pyarrow copy because Tables are immutable — mutation ops like rename_columns return new Tables without touching the cached one.

to_pandas()

Notebook one-liner: frame.to_pandas().

Raises ImportError with an actionable hint if pandas is not installed; we deliberately do not list it in the SDK's hard requirements so notebook users opt-in via zelos-sdk[analysis].

values()

One SignalSeries per non-time column (parallel to keys()).

SignalSeries(name, values, *, signal=None, unit=None, time=None)

One Arrow column with pyarrow/pandas helpers and a unit-aware math surface.

Unit composes through * and /; + and - require matching units. Scalar operations are unit-blind (the unit is preserved as-is).

legend: str property

Display label as "name (unit)" (or just name when unitless).

clip(lower=None, upper=None)

Clamp values to [lower, upper]. At least one bound is required.

derivative()

Numerical derivative over time (centered, np.gradient).

derive(values=None, *, label=None, unit=None)

Manually construct a derived series; unset fields inherit from self.

values is positional so notebook code reads naturally (voltage.derive(rolling, label="...", unit="V")); label is the new series name (kept as a kwarg to mirror the math-op output convention).

integrate()

Cumulative trapezoidal integration over time; first sample is 0.

to_pandas()

pandas.Series (with UTC DatetimeIndex if time is set).

where(mask)

Keep values where mask is True; the rest become NaN. Mask length must match.

SortOrder

Asc / Desc — drives row ordering for Agent.query() / Trace.query(). Wire encoding mirrors the proto's SortOrder enum (Asc=0, Desc=1).

TimeMode

Relative / Absolute — selects how trace-side query bounds are interpreted. Wire encoding mirrors MultiTraceTimeMode (Relative=0, Absolute=1).

TimeRangeMulti

Multi-trace time range response (overlay / absolute mode).

Trace

Handle to a trace file opened through Agent.open_trace.

Owns an Arc<AgentTransport> cloned from its parent Agent, so the trace shares the agent's connection lifecycle: calling agent.close() invalidates this handle too, and any subsequent method on it raises AgentCancelled.

The handle remembers the trace's filesystem path and the time_range reported by the agent at open time. Use trace.signals() to fetch the trace's catalog (always re-fetched — the live-signals 5 s TTL cache on Agent does not apply here); trace.query(...), trace.at(...), trace.window(...), and trace.export(...) all dispatch against this trace's contents.

Example: trace = agent.open_trace("/data/run.trz") df = trace.query(["can/Battery.*"]).to_pandas() trace.close()

at(paths, time, *, min_time=...)

Per-signal value at (or before) a specific absolute timestamp inside the trace.

Resolves against this trace's catalog. time and min_time accept datetime, str, or raw int epoch ns and are always treated as absolute (epoch ns). To anchor relative to the trace start, compute trace.time_range.start + timedelta(seconds=offset). Returns a dict keyed by signal path. Raises SignalNotFound on catalog miss; ValueError if min_time > time.

close()

Release the agent's hold on this trace file.

Idempotent. After close() the agent may garbage-collect any catalog and indices it loaded for this file. Subsequent methods on this handle either re-open the file (if the agent still has it on disk) or raise TraceNotFound. The per-handle catalog cache is preserved (it's still valid for read; the close just signals the agent it can drop server-side state).

export(path, *, overwrite=..., start=..., end=..., relative_start=..., relative_end=...)

Export this trace (or a slice of it) to a new .trz file.

Time bounds are optional and may be supplied as either absolute (start / end, accepting datetime, ISO 8601, "-1m", or "now") or relative (relative_start / relative_end, in seconds from this trace's start). Mixing them raises ValueError. Omitting all four exports the full trace.

Args: path: Destination path for the new .trz file. overwrite: Replace path if it exists. start: Absolute start; pair with end. end: Absolute end; pair with start. relative_start: Seconds from trace start; pair with relative_end. relative_end: Seconds from trace start; pair with relative_start.

Returns: ExportResult: On-disk path plus per-producer success bits.

query(paths, *, start=..., end=..., relative_start=..., relative_end=..., downsample=..., max_rows=..., sort=..., sort_order=...)

Time-bounded query against this trace file.

Resolves paths against the trace's catalog (trace mode is authoritative — literal-path misses raise SignalNotFound). Pass start/end for absolute (wall-clock) bounds or relative_start/relative_end for seconds-from-trace-start; mixing the two raises ValueError. Omitting all four queries the full trace. Use downsample for plot-friendly bin widths.

Args: paths: A str, Signal, or sequence thereof. start: Absolute start (datetime, ISO 8601, "-1m", or "now"); paired with end. end: Absolute end; paired with start. relative_start: Seconds from trace start; paired with relative_end. relative_end: Seconds from trace start; paired with relative_start. downsample: Optional M4 bucket count — same semantics as Agent.query(downsample=N). The agent returns at most four points per bucket. Stub takes float for forward compatibility but it is interpreted as a count, not seconds. max_rows: Cap on returned rows; 0 (default) means no cap. sort_order: SortOrder.Asc (default) or SortOrder.Desc.

Returns: SignalFrame: Arrow-backed frame; empty when no rows match.

Raises: SignalNotFound: catalog miss for a literal path. ValueError: bound mixing or unparsable timestamp.

segments()

List the trace file's data segments.

A segment is a contiguous chunk inside the trace bounded by a TraceSegmentStart / TraceSegmentEnd pair. Use segments_with_warnings() to also receive the per-trace warning list.

segments_with_warnings()

Same as segments() but returns (segments, warnings).

signals()

Fetch the trace's signal catalog.

Cached per-handle: the first call hits the wire, every subsequent call returns the same SignalCatalog (trace files are immutable — the catalog cannot drift). Supports search(), by_path(), and slicing — same shape as the live agent catalog.

Returns: SignalCatalog: All signals present in this trace.

window(paths, *, start, duration, display_fps=...)

Replay snapshot + change-stream over an absolute window inside this trace.

Same semantics as Agent.window() scoped to this trace. start accepts datetime, str, or int ns and is treated as absolute (epoch ns); duration accepts int ns, float seconds, or datetime.timedelta. display_fps=0 means "no thinning". Raises ValueError on duration <= 0 or display_fps < 0.

TraceEventFieldMetadata

Metadata describing a field in a trace event schema.

This class defines the structure of a field within an event schema, including its name, data type, and optional unit of measurement.

Args: name (str): The field name. data_type (DataType): The data type for the field. unit (Optional[str]): Optional unit of measurement.

Examples: >>> # Define a field for HTTP status code >>> status_field = TraceEventFieldMetadata("status_code", DataType.Int32) >>> >>> # Define a field with a unit of measurement >>> duration_field = TraceEventFieldMetadata( ... "duration_ms", DataType.Float64, "milliseconds")

TraceMetadata

Python wrapper for trace metadata.

Contains information about a complete trace including its time range, producer, and associated data segments.

TraceNamespace

A namespace that manages and organizes TraceSources.

TraceNamespace provides a centralized registry for TraceSources with an isolated router. Each namespace has its own router, allowing complete isolation between different namespaces for testing or multi-tenant scenarios.

Examples: >>> # Create an isolated namespace >>> ns = TraceNamespace("my_app") >>> source = TraceSource("service", namespace=ns) >>> with TraceWriter("data.trz", namespace=ns) as writer: ... source.log("event", value=42)

__del__()

Python destructor — only shuts down non-global namespaces. The global namespace is cleaned up via atexit.

__repr__()

String representation of the namespace.

drain()

Flush all sources, signal the router to drain, and block until every queued event has been delivered to subscribed sinks.

Call this before tearing down a writer (or at the end of a transcode) when you need at-least-once delivery — __del__ / atexit drain are best-effort and don't block on the caller's thread. Releases the GIL while waiting.

Idempotent and safe to call multiple times. Must NOT be called from inside the tokio runtime (e.g. from an async-Python task) because it blocks on the router's drain ack.

source_count()

Get the number of registered sources.

Returns: int: Number of registered sources.

TraceReadEvent

Python wrapper for a trace read event.

Represents an event containing multiple fields.

TraceReadEventField

Python wrapper for a trace read event field.

Represents a single field within an event.

TraceReadSource

Python wrapper for a trace read source.

Represents a source (e.g., "can") containing multiple events.

TraceReader

Python wrapper for the TraceReader.

This reader provides read-only access to trace files, allowing you to query metadata and retrieve trace data programmatically. It supports listing data segments, querying time ranges, and retrieving raw or downsampled data.

The reader uses context management and should be used with a with statement to ensure proper resource cleanup.

Complete End-to-End Workflow

This example demonstrates opening a trace file, discovering available fields, and querying specific data:

import zelos_sdk
import pyarrow as pa

# Open trace file for reading
with zelos_sdk.TraceReader("recording.trz") as reader:
    # Discover available segments
    segments = reader.list_data_segments()
    assert len(segments) > 0

    # Discover available fields hierarchically
    sources = reader.list_fields()
    assert len(sources) > 0

    # Navigate hierarchy: source → event → field
    can_source = next(s for s in sources if s.name == "can")
    msg_event = next(e for e in can_source.events if e.name == "VehicleSpeed")
    speed_field = next(f for f in msg_event.fields if f.name == "speed")

    # Query discovered field
    time_range = reader.time_range()
    result = reader.query(
        data_segment_ids=[s.id for s in segments],
        fields=[speed_field.path],  # "*/can/VehicleSpeed.speed"
        start=time_range.start,
        end=time_range.end,
    )

    # Verify data received
    arrow_reader = pa.ipc.open_stream(result.to_arrow())
    table = arrow_reader.read_all()
    assert table.num_rows > 0
    assert speed_field.path in result.fields or any(speed_field.path in s for s in result.fields)

__repr__()

String representation of the reader.

close()

Close the trace reader.

This method closes the trace file and releases resources. It's automatically called when exiting the context manager.

Returns: None

get_value_table(data_segment_id, field_path)

Get the value table (enum mapping) for a specific field.

Args: data_segment_id (str): Data segment ID to query. field_path (str): Field path in format "source/event.field" (without the "*/" prefix).

Returns: dict: Mapping of integer keys to string values, or None if no value table exists.

Raises: RuntimeError: If the reader is not open or query fails.

Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... # Get enum mapping for a status field ... status_map = reader.get_value_table( ... segments[0].id, ... "controller/state.status" ... ) ... if status_map: ... print(status_map) # {0: "IDLE", 1: "RUNNING", 2: "ERROR"}

list_data_segments()

List all data segments in the trace.

Returns: List[DataSegment]: List of data segment metadata.

Raises: RuntimeError: If the reader is not open or query fails.

Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... for seg in segments: ... print(f"Segment {seg.id}: {seg.producer}")

list_data_segments_in_time_range(start, end)

List data segments within a specific time range.

Args: start (datetime): Start of time range (inclusive). end (datetime): End of time range (inclusive).

Returns: List[DataSegment]: List of data segments overlapping the time range.

Raises: RuntimeError: If the reader is not open or query fails.

Examples: >>> from datetime import datetime, timezone >>> start = datetime(2024, 1, 1, tzinfo=timezone.utc) >>> end = datetime(2024, 1, 2, tzinfo=timezone.utc) >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments_in_time_range(start, end)

list_fields(data_segment_id=...)

List all fields in the trace organized by source and event.

This method discovers all available fields in the trace by querying the database schema and organizing them hierarchically by source and event.

Args: data_segment_id (str, optional): Specific data segment ID to query. If None, queries all segments.

Returns: List[TraceReadSource]: List of sources, each containing events and fields.

Raises: RuntimeError: If the reader is not open or query fails.

Example: Discover and Query Fields
with TraceReader("recording.trz") as reader:
    # Discover all available fields
    sources = reader.list_fields()
    assert len(sources) > 0

    # Navigate the hierarchy
    for source in sources:
        for event in source.events:
            for field in event.fields:
                # field.path is the field path for queries (e.g., "*/can/VehicleSpeed.speed")
                assert field.path.startswith("*/") and "." in field.path

list_traces()

List all traces in the trace file.

Returns: List[TraceMetadata]: List of trace metadata.

Raises: RuntimeError: If the reader is not open or query fails.

Examples: >>> with TraceReader("my_trace.trz") as reader: ... traces = reader.list_traces() ... for trace in traces: ... print(f"Trace {trace.name}: {trace.start_date} to {trace.end_date}")

open()

Open the trace file for reading.

This method initializes the reader and opens the trace file in read-only mode. It's automatically called when entering the context manager (with statement).

Returns: None

Raises: RuntimeError: If the trace file cannot be opened.

query(data_segment_ids, fields, start, end)

Query data for specified fields within a time range.

This returns raw, unsampled data for the requested fields.

Args: data_segment_ids (List[str]): List of data segment IDs to query. fields (List[str]): List of field paths (e.g., "bus0/msg1/sig1"). start (datetime): Start of time range (inclusive). end (datetime): End of time range (inclusive).

Returns: QueryResult: Query results with Arrow data.

Raises: RuntimeError: If the reader is not open or query fails.

Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... time_range = reader.time_range() ... result = reader.query( ... data_segment_ids=[s.id for s in segments], ... fields=["bus0/msg1/sig1", "bus0/msg2/sig3"], ... start=time_range.start, ... end=time_range.end, ... ) ... # Convert to PyArrow table ... import pyarrow as pa ... arrow_reader = pa.ipc.open_stream(result.to_arrow()) ... table = arrow_reader.read_all() ... df = table.to_pandas()

time_range()

Get the time range covered by the trace.

Returns: TimeRange: Object containing start and end timestamps.

Raises: RuntimeError: If the reader is not open or query fails.

Examples: >>> with TraceReader("my_trace.trz") as reader: ... time_range = reader.time_range() ... print(f"Start: {time_range.start}") ... print(f"End: {time_range.end}")

TraceSender

Communication channel for sending trace events.

This class is typically obtained from a TracePublishClient and passed to a TraceSource during creation. It handles the underlying message transport.

Note: Users generally don't need to interact with this class directly; it's used internally to connect TraceSource to TracePublishClient.

TraceSet

Handle to a set of trace files opened together for multi-trace queries.

Returned by Agent.open_traces([...]). Like Trace, holds an Arc<AgentTransport> cloned from its parent Agent and shares the agent's connection lifecycle. The handle's paths is the input vec preserved verbatim; time_range is a TimeRangeMulti that reports per-trace timing alongside the union range.

signals() returns the union catalog across every trace in the set (signals are tagged with their trace_path so resolved paths route to the right file). Queries fan out across whichever traces own matching signals.

Example: ts = agent.open_traces(["/data/a.trz", "/data/b.trz"]) df = ts.query(["can/Motor.rpm"]).to_pandas() ts.close()

at(paths, time, *, min_time=...)

Per-signal value at (or before) an absolute timestamp across all member traces.

Always treats time and min_time as absolute (epoch ns), accepting datetime, str, or int ns. Returns a dict keyed by signal path. Raises ValueError if min_time > time.

close()

Release the agent's hold on every trace in the set.

Idempotent. Same semantics as Trace.close() but issued for every member trace in one RPC.

export(path, *, overwrite=..., time_mode=..., start=..., end=..., relative_start=..., relative_end=...)

Export the union of all member traces to a single new .trz file.

Same time_mode contract as TraceSet.query(). The agent reads from every member file and merges the results into the output.

Args: path: Destination path for the merged .trz file. overwrite: Replace path if it exists. time_mode: "relative" (default) or "absolute". start: Absolute start; pair with end. Only valid when time_mode="absolute". end: Absolute end; pair with start. relative_start: Seconds from each trace's start; pair with relative_end. Only valid when time_mode="relative". relative_end: Seconds from each trace's start.

Returns: ExportResult: On-disk path plus per-producer success bits.

query(paths, *, time_mode=..., start=..., end=..., relative_start=..., relative_end=..., downsample=..., max_rows=..., sort=..., sort_order=...)

Time-bounded query joining results across every member trace.

time_mode="relative" (default) aligns each trace at t=0 and accepts relative_start/relative_end (seconds from each trace's start). time_mode="absolute" uses wall-clock bounds via start/end. The pair that doesn't match the chosen mode raises ValueError. Omitting all four bounds covers each member trace's full range.

Args: paths: A str, Signal, or sequence thereof. Wildcards expand against the union catalog. time_mode: "relative" (default) or "absolute". start: Absolute start; pair with end. Only valid when time_mode="absolute". end: Absolute end; pair with start. relative_start: Seconds from each trace's start; pair with relative_end. Only valid when time_mode="relative". relative_end: Seconds from each trace's start. downsample: Optional M4 bucket count — same semantics as Agent.query(downsample=N). Interpreted as a count, not seconds. max_rows: Cap on returned rows; 0 (default) means no cap. sort_order: SortOrder.Asc (default) or SortOrder.Desc.

Returns: SignalFrame: Arrow-backed frame; empty when no rows match.

segments()

List per-trace segments across every member trace.

Segment entries carry their owning trace_path so callers can group them. Use segments_with_warnings() to also receive any warnings the agent reported while reading the files.

segments_with_warnings()

Same as segments() but returns (segments, warnings).

signals()

Fetch the union signal catalog across every member trace.

Cached per-handle (see [PyTrace::signals]). Each signal carries its source trace via signal.trace_path so later queries route to the right file.

Returns: SignalCatalog: Union of all member-trace catalogs.

window(paths, *, start, duration, display_fps=...)

Replay snapshot + change-stream over an absolute window across all member traces.

Always treats start as absolute (epoch ns), accepting datetime, str, or int ns. duration accepts int ns, float seconds, or datetime.timedelta. Raises ValueError on duration <= 0 or display_fps < 0.

TraceSourceCache(name, namespace=None)

A TraceSource wrapper that caches the last value of each field.

Uses a Rust core for cache storage, condition evaluation, and emit decisions. Python layer provides Pythonic attribute navigation.

Example: source = TraceSourceCache("motor_controller") source.add_event("motor_stats", [ TraceEventFieldMetadata("rpm", DataType.Float64), TraceEventFieldMetadata("torque", DataType.Float64, "Nm") ]) source.log("motor_stats", {"rpm": 3500.0, "torque": 42.8}) assert source.motor_stats.rpm.get() == 3500.0

add_event(name, schema, conditions=None)

Register an event schema.

add_value_table(name, field_name, data)

Register a value table (enum mapping).

event_field_names(name)

Get field names for a registered event.

field_data_type(event_name, field_name)

Get field data type.

get_source()

Not supported — use the cache API directly.

get_time_ns(event_name)

Get the wall-clock timestamp (ns since Unix epoch) of the last log for an event, or None if the event hasn't been logged or doesn't exist.

get_value(event_name, field_name)

Get the cached value for a field, or None.

has_event(name)

Check if an event is registered.

log(name, data)

Log data and update cache. Auto-registers event if not yet registered.

log_at(time_ns, name, data)

Log data at a specific timestamp.

set_default_log_condition(condition=None)

Set default log condition. Pass a Python LogCondition or None.

TraceSourceCacheLastEvent(name, cache, source, conditions=None)

A cached event with attribute access to fields and submessages.

Example: event = source.motor_stats event.rpm.get() # field access event.thermal.temp.get() # submessage access event.log(rpm=3500) # log via event

time_ns: Optional[int] property

Wall-clock timestamp of the last log call for this event, in nanoseconds since the Unix epoch. None if the event hasn't been logged yet. Symmetric with 🇵🇾meth:TraceSourceCacheLastField.get.

TraceSourceCacheLastField(name, full_path, data_type, cache, event_name, condition=None, uses_default=False)

A cached field that stores the last logged value.

Example: field = event.rpm field.get() # Get cached value field.name # Full path: "motor_stats.rpm"

get()

Get the cached value.

set(value)

No-op — cache is updated atomically during log() calls in the Rust core. Retained for backward compatibility.

TraceSourceEvent

__repr__()

String representation of the event.

log(**kwargs)

Log an event with a dictionary of fields.

Args: kwargs (dict): Keyword arguments to log.

Returns: None

Examples: >>> event = source.add_event("motor_stats") >>> event.log(rpm=3500, torque=42.8)

log_at(time_ns, **kwargs)

Log an event with data provided as a dictionary at a specific time. Performs type checking based on the schema.

Args: time_ns (int): Timestamp in nanoseconds since Unix epoch. fields (dict[str, Any]): Dictionary of field names to values.

Raises: ValueError: If a field is not in the schema. TypeError: If a value's type doesn't match the schema. RuntimeError: If sending the event fails internally.

Examples: >>> event = source.get_event("motor_stats") >>> # Log with custom timestamp >>> event.log_at(1625097600000000000, { ... "rpm": 3500.0, ... "torque": 42.8, ... "temperature": 75.5 ... })

TraceStdout

Python wrapper for the stdout trace sink.

This sink outputs trace events to stdout with configurable log levels. It subscribes to all trace events from the router and formats them as structured log messages.

The sink uses context management and should be used with a with statement to ensure proper resource cleanup and automatic start/stop of trace capture.

Examples: >>> # Basic usage with default settings (info level) >>> with TraceStdout() as sink: ... # Trace events will be logged to stdout ... pass >>> >>> # Custom log level and batch configuration >>> with TraceStdout(log_level="debug", batch_size=500, batch_timeout_ms=2000) as sink: ... # Trace events will be logged with custom settings ... pass

__repr__()

String representation of the sink.

close()

Stop the stdout sink and finalize trace capture.

This method gracefully shuts down the sink and cancels background tasks. It's automatically called when exiting the context manager.

Returns: None

open()

Start the stdout sink and begin capturing events.

This method subscribes to the trace router and starts a background task to process and output trace events to stdout. It's automatically called when entering the context manager (with statement).

Returns: None

Raises: RuntimeError: If the sink cannot be initialized.

TraceTiming

Per-trace timing info used for relative-mode alignment in multi-trace queries.

Rust stores start_s: f64 (epoch seconds) and duration_s: f64 to match the wire shape; computed start: datetime / duration: timedelta accessors keep user code type-safe.

TraceWriter

Python wrapper for the TraceWriter.

This writer manages writing trace events to a local file, with support for batching and buffering. It can be used with a TraceSource to capture events for later analysis.

The writer uses context management and should be used with a with statement to ensure proper resource cleanup and automatic start/stop of trace capture.

Examples: >>> # Basic usage with default settings >>> with TraceWriter("my_trace.trz") as writer: ... # Trace events will be captured automatically ... pass >>> >>> # Custom batch configuration >>> with TraceWriter("my_trace.trz", batch_size=500, batch_timeout_ms=2000) as writer: ... # Trace events will be captured with custom batch settings ... pass

__repr__()

String representation of the writer.

close()

Stop the trace writer and finalize trace capture.

This method gracefully shuts down the writer, cancels background tasks, and ensures all buffered events are written to the trace file. It's automatically called when exiting the context manager.

Returns: None

Note: This method is called automatically by exit when using the context manager pattern.

open()

Start the trace writer and begin capturing events.

This method initializes the writer and starts background tasks for batching and writing trace events. It's automatically called when entering the context manager (with statement).

Returns: None

Raises: RuntimeError: If the writer cannot be initialized.

Note: This method is called automatically by enter when using the context manager pattern.

get_global_router_sender()

Get the global default trace router sender (from global namespace)

Returns: TraceSender: The global namespace's router sender

Examples: >>> sender = get_global_router_sender()

init(name=None, *, url=None, client_config=None, log_level=None, trace=True, actions=False, block=False)

Initialize the Zelos SDK tracing and actions systems.

Args: name: Application identifier; defaults to "python". url: Agent endpoint (e.g. "http://host:port"). Forwarded to the trace publish client and the actions client. Falls back to ZELOS_AGENT_URL and finally http://localhost:2300. client_config: Configuration for the TracePublishClient (batch_size, batch_timeout_ms). log_level: Logging level to enable, None leaves logging untouched. trace: Initialize the trace system. Defaults to True. actions: Initialize the actions system. Defaults to False. block: Block the current thread until interrupted (useful for actions-only programs).

Examples: >>> init() >>> init("my_app", url="grpc://localhost:2300", log_level="debug") >>> init(log_level="debug", trace=False) # logging only >>> init(actions=True, block=True)

parse_connect_target(target=...)

Normalize a connect-target string. Exposed for pre-validation without actually connecting (e.g. config validation in CI).

parse_duration(value, *, allow_zero=True)

Non-negative seconds. Rejects NaN/inf and (optionally) zero. bool is rejected explicitly.

parse_time(value)

Normalize to a UTC-aware datetime.

Accepts: * datetime (naive → assumed UTC). * "now". * ISO 8601 (including a trailing Z). * Relative offset from now: "-30s", "-2m", "-1.5h", "-1d" (or the rare "+30s" for explicit future bounds). Mirrors the zelos CLI's --start -30s convention.

parse_until(value)

Absolute deadline from datetime/ISO-string, or now + seconds from a float.