Skip to main content

Python bindings guide

tsink ships Python bindings via UniFFI. The tsink package gives you the full storage engine — writes, queries, aggregation, rollups, snapshots — from Python with no server process required. After installation, import it as tsink. UniFFI record types such as Label, Row, and MetricSeries use keyword-only constructors in Python.

Installation

pip install tsink
Requires Python 3.8+. The wheel includes the native Rust library; no Rust toolchain needed at runtime.

Building from source

If a pre-built wheel is not available for your platform:
pip install maturin
cd crates/tsink-uniffi
maturin develop --release

Quick start

from tsink import TsinkStorageBuilder, DataPoint, Label, Row, Value

builder = TsinkStorageBuilder()
builder.with_data_path("./tsink-data")
db = builder.build()

db.insert_rows([
    Row(
        metric="cpu_usage",
        labels=[Label(name="host", value="web-1")],
        data_point=DataPoint(timestamp=1_700_000_000_000, value=Value.F64(v=42.0)),
    ),
])

points = db.select(
    "cpu_usage",
    [Label(name="host", value="web-1")],
    0,
    2_000_000_000_000,
)
for p in points:
    print(f"ts={p.timestamp} value={p.value}")

db.close()

TsinkStorageBuilder

Create a builder, call configuration methods, then call build() once to get a TsinkDB handle. The builder is consumed by build() and cannot be reused.
from datetime import timedelta
from tsink import (
    TsinkStorageBuilder,
    TimestampPrecision,
    WalSyncMode,
    WalReplayMode,
    StorageRuntimeMode,
)

builder = TsinkStorageBuilder()
builder.with_data_path("/var/lib/tsink")
builder.with_retention(timedelta(days=30))
builder.with_timestamp_precision(TimestampPrecision.MILLISECONDS)
builder.with_memory_limit(512 * 1024 * 1024)       # 512 MiB
builder.with_cardinality_limit(1_000_000)
builder.with_wal_sync_mode(WalSyncMode.PERIODIC(interval=timedelta(seconds=1)))
db = builder.build()

Configuration methods

All methods return None and mutate the builder in place.

Data & persistence

MethodDefaultDescription
with_data_path(path)noneDirectory for WAL, segments, and metadata.
with_object_store_path(path)nonePath (or object-store prefix) for warm/cold tier segments.

Retention

MethodDefaultDescription
with_retention(duration)14 daysGlobal retention window.
with_retention_enforced(bool)FalseReject writes outside the retention window.
with_tiered_retention_policy(hot, warm)noneSeparate retention for hot and warm tiers.

Timestamp precision

MethodDefaultDescription
with_timestamp_precision(precision)NanosecondsOne of Nanoseconds, Microseconds, Milliseconds, Seconds.

Chunks & partitions

MethodDefaultDescription
with_chunk_points(n)2048Points per in-memory chunk before sealing.
with_partition_duration(duration)1 hourTime range per partition.
with_max_active_partition_heads_per_series(n)8Active partition heads per series (out-of-order fanout).

Memory & cardinality

MethodDefaultDescription
with_memory_limit(bytes)unlimitedMemory budget; exceeding triggers backpressure.
with_cardinality_limit(series)unlimitedMaximum unique series count.

Concurrency

MethodDefaultDescription
with_max_writers(n)CPU countParallel writer threads.
with_write_timeout(duration)30 sMaximum wait for a writer slot.

WAL

MethodDefaultDescription
with_wal_enabled(bool)TrueEnable/disable the write-ahead log.
with_wal_size_limit(bytes)unlimitedMaximum WAL size on disk.
with_wal_buffer_size(size)defaultIn-memory WAL buffer size.
with_wal_sync_mode(mode)PerAppendWalSyncMode.PER_APPEND (crash-safe) or WalSyncMode.PERIODIC(interval).
with_wal_replay_mode(mode)StrictWalReplayMode.STRICT or WalReplayMode.SALVAGE.

Remote segments

MethodDefaultDescription
with_remote_segment_cache_policy(policy)MetadataOnlyCaching strategy for remote tier segments.
with_remote_segment_refresh_interval(duration)defaultRefresh interval for the remote segment catalog.
with_mirror_hot_segments_to_object_store(bool)FalseMirror hot segments to the object store.

Runtime

MethodDefaultDescription
with_runtime_mode(mode)ReadWriteStorageRuntimeMode.READ_WRITE or COMPUTE_ONLY.
with_background_fail_fast(bool)TrueHalt on unrecoverable background errors.
with_metadata_shard_count(n)autoNumber of metadata shards.

TsinkDB

TsinkDB is the main database handle returned by builder.build(). It is thread-safe and can be shared across Python threads.

Writing data

Value types

Values are represented by the Value enum:
Value.F64(v=3.14)              # 64-bit float
Value.I64(v=-42)               # 64-bit signed integer
Value.U64(v=100)               # 64-bit unsigned integer
Value.BOOL(v=True)             # boolean
Value.BYTES(v=b"\xCA\xFE")    # raw bytes
Value.STR(v="hello")           # UTF-8 string
Value.HISTOGRAM(v=histogram)   # native Prometheus histogram

Insert rows

rows = [
    Row(
        metric="http_requests_total",
        labels=[
            Label(name="method", value="GET"),
            Label(name="status", value="200"),
        ],
        data_point=DataPoint(timestamp=1_700_000_000_000, value=Value.F64(v=1027.0)),
    ),
    Row(
        metric="memory_free_bytes",
        labels=[Label(name="host", value="web-1")],
        data_point=DataPoint(timestamp=1_700_000_000_000, value=Value.I64(v=8_589_934_592)),
    ),
]

db.insert_rows(rows)

Write acknowledgement

insert_rows_with_result returns a WriteResult so you can inspect the durability guarantee:
result = db.insert_rows_with_result(rows)
print(result.acknowledgement)
# WriteAcknowledgement.DURABLE   — fsync'd (PerAppend WAL mode)
# WriteAcknowledgement.APPENDED  — in WAL buffer (Periodic mode)
# WriteAcknowledgement.VOLATILE  — in memory only (WAL disabled)

Querying data

Simple select

points = db.select(
    "cpu_usage",
    [Label(name="host", value="web-1")],
    start=0,
    end=2_000_000_000_000,
)

for p in points:
    print(p.timestamp, p.value)
Pass an empty label list to match the bare metric (no labels):
points = db.select("cpu_usage", [], start=0, end=2_000_000_000_000)

Select all label combinations

all_series = db.select_all("http_requests_total", start=0, end=2_000_000_000_000)

for labeled in all_series:
    tag_str = ", ".join(f"{l.name}={l.value}" for l in labeled.labels)
    print(f"[{tag_str}]: {len(labeled.data_points)} points")
Returns a list of LabeledDataPoints, each carrying labels and data_points.

Select multiple series at once

from tsink import MetricSeries

series = [
    MetricSeries(name="cpu_usage", labels=[Label(name="host", value="web-1")]),
    MetricSeries(name="cpu_usage", labels=[Label(name="host", value="web-2")]),
]

results = db.select_many(series, start=0, end=2_000_000_000_000)
for sp in results:
    print(f"{sp.series.name} {sp.series.labels}: {len(sp.points)} points")

Advanced queries with QueryOptions

from tsink import Aggregation, DownsampleOptions, QueryOptions

options = QueryOptions(
    labels=[Label(name="host", value="web-1")],
    start=0,
    end=2_000_000_000_000,
    aggregation=Aggregation.AVG,
    downsample=DownsampleOptions(interval=60_000),  # 1-minute buckets
    limit=1000,
    offset=0,
)

points = db.select_with_options("cpu_usage", options)
Available aggregations:
Aggregation variantDescription
NONENo aggregation (default)
SUMSum of values
MIN / MAXMinimum / maximum
AVGArithmetic mean
FIRST / LASTFirst / last point in window
COUNTNumber of points
MEDIANMedian value
RANGEMax − Min
VARIANCE / STD_DEVStatistical variance / standard deviation

Paginated row scanning

For large result sets, scan in pages:
from tsink import QueryRowsScanOptions

offset = None
while True:
    page = db.scan_metric_rows(
        "cpu_usage",
        start=0,
        end=2_000_000_000_000,
        options=QueryRowsScanOptions(max_rows=10_000, row_offset=offset),
    )
    process(page.rows)

    if page.truncated:
        offset = page.next_row_offset
    else:
        break
scan_series_rows works the same way but accepts a list of MetricSeries instead of a metric name.

Series discovery

List all metrics

all_metrics = db.list_metrics()
for m in all_metrics:
    print(m.name, m.labels)
list_metrics_with_wal() includes series that exist only in the WAL (not yet flushed).

Filter with matchers

from tsink import SeriesMatcher, SeriesMatcherOp, SeriesSelection

selection = SeriesSelection(
    metric="http_requests_total",
    matchers=[
        SeriesMatcher(name="method", op=SeriesMatcherOp.EQUAL, value="GET"),
        SeriesMatcher(name="status", op=SeriesMatcherOp.REGEX_MATCH, value="2.."),
    ],
    start=None,
    end=None,
)

matched = db.select_series(selection)
SeriesMatcherOpEquivalentExample
EQUAL=method="GET"
NOT_EQUAL!=status!="500"
REGEX_MATCH=~host=~"web-.*"
REGEX_NO_MATCH!~env!~"staging|dev"

Deleting series

result = db.delete_series(
    SeriesSelection(metric="old_metric", matchers=[], start=None, end=None)
)
print(f"matched={result.matched_series}, tombstones={result.tombstones_applied}")
Deletion is tombstone-based. Tombstones are merged during compaction.

Rollup policies

Define materialized downsampled views:
from tsink import RollupPolicy

policies = [
    RollupPolicy(
        id="cpu_5m_avg",
        metric="cpu_usage",
        match_labels=[],
        interval=300_000,             # 5 minutes in ms
        aggregation=Aggregation.AVG,
        bucket_origin=0,
    ),
]

snapshot = db.apply_rollup_policies(policies)
for status in snapshot.policies:
    print(f"{status.policy.id}: materialized through {status.materialized_through}")
Trigger an immediate rollup run:
snapshot = db.trigger_rollup_run()

Snapshots

db.snapshot("/backups/tsink-2026-03-12")

# Restore to a new data directory.
TsinkStorageBuilder.restore_from_snapshot("/backups/tsink-2026-03-12", "/var/lib/tsink")

Observability

Inspect engine internals at runtime:
snap = db.observability_snapshot()

print(f"memory: {snap.memory.active_and_sealed_bytes} / {snap.memory.budgeted_bytes} bytes")
print(f"WAL: {snap.wal.segment_count} segments, {snap.wal.size_bytes} bytes")
print(f"compaction: {snap.compaction.runs_total} runs, {snap.compaction.errors_total} errors")
print(f"queries: {snap.query.select_calls_total} selects")

if snap.health.degraded:
    print(f"engine degraded: {snap.health.last_background_error}")
The snapshot covers memory, WAL, retention, flush pipeline, compaction, queries, rollups, remote storage, and overall health.

Memory inspection

print(f"used:   {db.memory_used()} bytes")
print(f"budget: {db.memory_budget()} bytes")

Closing the database

db.close()
Flushes remaining data, syncs the WAL, and shuts down background workers. Any method called after close() raises TsinkUniFFIError.

Native histograms

Store Prometheus-style native histograms:
from tsink import (
    NativeHistogram,
    HistogramBucketSpan,
    HistogramCount,
    HistogramResetHint,
)

histogram = NativeHistogram(
    count=HistogramCount.INT(v=10),
    sum=55.0,
    schema=3,
    zero_threshold=1e-128,
    zero_count=HistogramCount.INT(v=1),
    negative_spans=[],
    negative_deltas=[],
    negative_counts=[],
    positive_spans=[HistogramBucketSpan(offset=0, length=3)],
    positive_deltas=[2, 1, -1],
    positive_counts=[],
    reset_hint=HistogramResetHint.NO,
    custom_values=[],
)

db.insert_rows([
    Row(
        metric="request_duration",
        labels=[],
        data_point=DataPoint(
            timestamp=1_700_000_000_000,
            value=Value.HISTOGRAM(v=histogram),
        ),
    ),
])

Shard window operations

These methods support distributed deployments where data is partitioned across nodes. They are used internally by the cluster layer but are available for advanced use cases.
from tsink import MetadataShardScope, ShardWindowScanOptions

# Compute a digest (fingerprint) for a shard window.
digest = db.compute_shard_window_digest(
    shard=0, shard_count=16,
    window_start=0, window_end=2_000_000_000_000,
)
print(f"series={digest.series_count} points={digest.point_count} fp={digest.fingerprint}")

# Scan rows for a shard window with pagination.
page = db.scan_shard_window_rows(
    shard=0, shard_count=16,
    window_start=0, window_end=2_000_000_000_000,
    options=ShardWindowScanOptions(max_series=100, max_rows=10_000, row_offset=None),
)

# List metrics limited to specific shards.
scope = MetadataShardScope(shard_count=16, shards=[0, 1, 2])
metrics = db.list_metrics_in_shards(scope)

Error handling

All methods raise TsinkUniFFIError on failure. The exception carries a string message and is one of these variants:
VariantWhen it occurs
NoDataPointsNo data found for the given metric/time range.
InvalidTimeRangestart > end or otherwise invalid range.
StorageClosedOperation attempted after close().
InvalidInputBad metric name, label, or unsupported operation.
IoErrorFile-system or disk I/O failure.
DataCorruptionChecksum mismatch or parse error.
ResourceExhaustedMemory budget, cardinality limit, or WAL size limit exceeded.
OtherLock poisoning, channel errors, WAL issues, etc.
from tsink import TsinkUniFFIError

try:
    db.insert_rows(rows)
except TsinkUniFFIError as e:
    print(f"write failed: {e}")

Type reference

Records (dataclasses)

Python typeFieldsNotes
Labelname: str, value: strMetric label key-value pair.
DataPointtimestamp: int, value: ValueSingle data point.
Rowmetric: str, labels: list[Label], data_point: DataPointComplete write record.
MetricSeriesname: str, labels: list[Label]Series identity.
SeriesPointsseries: MetricSeries, points: list[DataPoint]Series with its data.
LabeledDataPointslabels: list[Label], data_points: list[DataPoint]Labels with points (from select_all).
QueryOptionslabels, start, end, aggregation, downsample, limit, offsetAdvanced query configuration.
DownsampleOptionsinterval: intDownsample bucket width.
SeriesSelectionmetric: Optional[str], matchers, start, endSeries filter.
SeriesMatchername: str, op: SeriesMatcherOp, value: strSingle label matcher.
RollupPolicyid, metric, match_labels, interval, aggregation, bucket_originRollup definition.
WriteResultacknowledgement: WriteAcknowledgementWrite durability level.
DeleteSeriesResultmatched_series: int, tombstones_applied: intDeletion outcome.

Enums

Python typeVariants
ValueF64, I64, U64, BOOL, BYTES, STR, HISTOGRAM
AggregationNONE, SUM, MIN, MAX, AVG, FIRST, LAST, COUNT, MEDIAN, RANGE, VARIANCE, STD_DEV
TimestampPrecisionNANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS
StorageRuntimeModeREAD_WRITE, COMPUTE_ONLY
RemoteSegmentCachePolicyMETADATA_ONLY
WalSyncModePER_APPEND, PERIODIC(interval)
WalReplayModeSTRICT, SALVAGE
WriteAcknowledgementVOLATILE, APPENDED, DURABLE
SeriesMatcherOpEQUAL, NOT_EQUAL, REGEX_MATCH, REGEX_NO_MATCH
HistogramCountINT(v), FLOAT(v)
HistogramResetHintUNKNOWN, YES, NO, GAUGE