API Client
Module: equser.api
Dependencies: [analysis] extra (requests, websocket-client)
REST and WebSocket clients for EQ Synapse gateways. Install with:
pip install equser[analysis]
SynapseClient
REST client for the EQ Synapse API (Actix-web server at port 8080).
from equser.api import SynapseClient
client = SynapseClient('http://gateway:8080')
list_devices() -> list[dict]
List all registered devices.
devices = client.list_devices()
for d in devices:
print(f"{d['id']}: {d.get('name', 'unnamed')}")
get_pmon_data(device_id, **params) -> pa.Table
Fetch PMon data as an Arrow Table.
table = client.get_pmon_data('wave-001')
print(f"Rows: {table.num_rows}, Columns: {table.column_names}")
Optional query parameters: start_time, end_time, metrics, limit.
get_cpow_data(device_id, **params) -> pa.Table
Fetch CPOW waveform data as an Arrow Table.
table = client.get_cpow_data('wave-001')
Optional query parameters: start_time, end_time, limit.
get_events(device_id=None, limit=100) -> list[dict]
Fetch recent power quality events.
events = client.get_events(limit=10)
for e in events:
print(f"{e['timestamp']}: {e['type']}")
query_sql(query, device_id=None, limit=None) -> list[dict]
Execute a SELECT query via the SQL endpoint. Only SELECT statements are allowed by the server.
rows = client.query_sql(
"SELECT time_us, FREQ, AVRMS FROM pmon ORDER BY time_us DESC",
limit=10,
)
WebSocket streaming
connect_cpow_stream(gateway_url) -> Generator
Connect to the CPOW waveform WebSocket and yield data in real time.
Each binary message is an Arrow IPC RecordBatch (~512 rows, 16 ms at 32 kHz). Text messages are JSON gap markers indicating dropped samples.
from equser.api import connect_cpow_stream
for item in connect_cpow_stream('http://gateway:8080'):
if isinstance(item, dict):
print(f"Gap: {item['skipped_samples']} samples")
else:
# item is a pyarrow.RecordBatch
va = item.column('VA').to_numpy()
print(f"Batch: {item.num_rows} rows, VA peak: {va.max()}")
connect_spectral_stream(device_id, ...) -> Generator
Connect to the spectral analysis WebSocket and yield JSON frames.
Args:
| Parameter | Default | Description |
|---|---|---|
device_id | (required) | Device identifier |
phase | 'va' | Channel: va, vb, vc, ia, ib, ic |
fft_size | 4096 | FFT window size (power of 2) |
update_rate | 10.0 | Frames per second |
freq_min | 0 | Minimum frequency (Hz) |
freq_max | 3000 | Maximum frequency (Hz) |
from equser.api import connect_spectral_stream
for frame in connect_spectral_stream('wave-001', phase='va', fft_size=8192):
print(f"Spectral frame: {len(frame.get('magnitudes', []))} bins")
API endpoints
The EQ Synapse server (Actix-web) exposes these endpoints:
| Endpoint | Method | Description |
|---|---|---|
/api/v1/devices | GET | List devices |
/api/v1/devices/{id}/pmon/data | GET | PMon data (Arrow IPC) |
/api/v1/devices/{id}/cpow/data | GET | CPOW data (Arrow IPC) |
/api/v1/events | GET | Power quality events |
/api/v1/events/stream | GET | Event stream (SSE) |
/api/v1/query/sql | POST | SQL query (SELECT only) |
/api/ws/cpow_stream | WS | Real-time waveform stream |
/api/ws/spectral | WS | Real-time spectral stream |
Data endpoints return Arrow IPC binary format for efficient transfer.