API Client

Module: equser.api Dependencies: [analysis] extra (requests, websocket-client)

REST and WebSocket clients for EQ Synapse gateways. Install with:

pip install equser[analysis]

SynapseClient

REST client for the EQ Synapse API (Actix-web server at port 8080).

from equser.api import SynapseClient

client = SynapseClient('http://gateway:8080')

list_devices() -> list[dict]

List all registered devices.

devices = client.list_devices()
for d in devices:
    print(f"{d['id']}: {d.get('name', 'unnamed')}")

get_pmon_data(device_id, **params) -> pa.Table

Fetch PMon data as an Arrow Table.

table = client.get_pmon_data('wave-001')
print(f"Rows: {table.num_rows}, Columns: {table.column_names}")

Optional query parameters: start_time, end_time, metrics, limit.

get_cpow_data(device_id, **params) -> pa.Table

Fetch CPOW waveform data as an Arrow Table.

table = client.get_cpow_data('wave-001')

Optional query parameters: start_time, end_time, limit.

get_events(device_id=None, limit=100) -> list[dict]

Fetch recent power quality events.

events = client.get_events(limit=10)
for e in events:
    print(f"{e['timestamp']}: {e['type']}")

query_sql(query, device_id=None, limit=None) -> list[dict]

Execute a SELECT query via the SQL endpoint. Only SELECT statements are allowed by the server.

rows = client.query_sql(
    "SELECT time_us, FREQ, AVRMS FROM pmon ORDER BY time_us DESC",
    limit=10,
)

WebSocket streaming

connect_cpow_stream(gateway_url) -> Generator

Connect to the CPOW waveform WebSocket and yield data in real time.

Each binary message is an Arrow IPC RecordBatch (~512 rows, 16 ms at 32 kHz). Text messages are JSON gap markers indicating dropped samples.

from equser.api import connect_cpow_stream

for item in connect_cpow_stream('http://gateway:8080'):
    if isinstance(item, dict):
        print(f"Gap: {item['skipped_samples']} samples")
    else:
        # item is a pyarrow.RecordBatch
        va = item.column('VA').to_numpy()
        print(f"Batch: {item.num_rows} rows, VA peak: {va.max()}")

connect_spectral_stream(device_id, ...) -> Generator

Connect to the spectral analysis WebSocket and yield JSON frames.

Args:

ParameterDefaultDescription
device_id(required)Device identifier
phase'va'Channel: va, vb, vc, ia, ib, ic
fft_size4096FFT window size (power of 2)
update_rate10.0Frames per second
freq_min0Minimum frequency (Hz)
freq_max3000Maximum frequency (Hz)
from equser.api import connect_spectral_stream

for frame in connect_spectral_stream('wave-001', phase='va', fft_size=8192):
    print(f"Spectral frame: {len(frame.get('magnitudes', []))} bins")

API endpoints

The EQ Synapse server (Actix-web) exposes these endpoints:

EndpointMethodDescription
/api/v1/devicesGETList devices
/api/v1/devices/{id}/pmon/dataGETPMon data (Arrow IPC)
/api/v1/devices/{id}/cpow/dataGETCPOW data (Arrow IPC)
/api/v1/eventsGETPower quality events
/api/v1/events/streamGETEvent stream (SSE)
/api/v1/query/sqlPOSTSQL query (SELECT only)
/api/ws/cpow_streamWSReal-time waveform stream
/api/ws/spectralWSReal-time spectral stream

Data endpoints return Arrow IPC binary format for efficient transfer.