Python Client Library
The Python client library provides an async gRPC interface to the WebSocketManager service.
Installation
From Gitea PyPI Registry
Set up authentication with a Gitea personal access token (scope: read:packages):
export PIP_USERNAME=<your-gitea-username>
export PIP_PASSWORD=<your-gitea-token>
pip install virtufin-websocketmanager \
--index-url https://pypi.haenerconsulting.com/api/packages/virtufin/pypi/simple/ \
--extra-index-url https://pypi.org/simple
From Local Source
pip install grpcio grpcio-reflection
export PYTHONPATH=/path/to/virtufin-websocketmanager/src/python
Quick Start
import asyncio
from virtufin.websocketmanager import WebSocketManagerClient
async def main():
async with WebSocketManagerClient("localhost", 5002) as client:
# Connect to a WebSocket server
response = await client.connect("wss://echo.websocket.org", auto_reconnect=True)
print(f"Connected: {response.id} - {response.status}")
# List all connections
list_resp = await client.list()
for conn in list_resp.connections:
print(f" {conn.id}: {conn.url} [{conn.status}]")
# Send a message and wait for response
send_resp = await client.send(response.id, '{"type": "ping"}', timeout_ms=5000)
print(f"Response: {send_resp.response}")
# Disconnect
await client.disconnect(response.id)
asyncio.run(main())
WebSocketManagerClient
The main client class for interacting with the WebSocketManager service.
Constructor
def __init__(self, host: str = "localhost", port: int = 5002)
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
gRPC server hostname |
port |
int |
5002 |
gRPC server port |
Context Manager
async with WebSocketManagerClient() as client:
# client is connected
pass
# Automatically closed
Connection Operations
connect()
Establish a new WebSocket connection.
response: ConnectResponse = await client.connect(url: str, auto_reconnect: bool = False)
| Parameter | Type | Description |
|---|---|---|
url |
str |
WebSocket server URL (e.g., wss://example.com/socket) |
auto_reconnect |
bool |
Enable automatic reconnection on disconnect |
Returns: ConnectResponse with id and status fields.
Example:
response = await client.connect("wss://echo.websocket.org", auto_reconnect=True)
print(f"Connected: {response.id}")
list()
List all managed WebSocket connections.
response: ListResponse = await client.list()
Returns: ListResponse with connections list. Each connection has id, url, status, topic, instance_id.
Example:
resp = await client.list()
for conn in resp.connections:
print(f"{conn.id}: {conn.url} [{conn.status}]")
disconnect()
Disconnect and remove a WebSocket connection.
await client.disconnect(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Connection ID to disconnect |
Example:
await client.disconnect(connection_id)
Messaging Operations
send()
Send a message and wait for a correlated response.
response: SendResponse = await client.send(id: str, message: str, timeout_ms: int = 5000)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Connection ID |
message |
str |
Message to send (JSON string) |
timeout_ms |
int |
Timeout in milliseconds to wait for response |
Returns: SendResponse with response field containing the server reply.
Example:
resp = await client.send(connection_id, '{"type": "request", "id": "123"}', timeout_ms=5000)
print(f"Response: {resp.response}")
send_raw()
Send a message without waiting for a response.
await client.send_raw(id: str, message: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Connection ID |
message |
str |
Message to send |
Example:
await client.send_raw(connection_id, '{"type": "notification"}")
Pub/Sub Operations
start_publish()
Start publishing WebSocket messages to a Dapr pub/sub topic.
await client.start_publish(id: str, topic: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Connection ID |
topic |
str |
Dapr pub/sub topic name |
Example:
await client.start_publish(connection_id, "websocket-events")
stop_publish()
Stop publishing WebSocket messages to the topic.
await client.stop_publish(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Connection ID |
Example:
await client.stop_publish(connection_id)
Via the API Gateway
The WebSocketManagerClient connects directly to the websocketmanager's
gRPC port (5002 by default). Alternatively, you can invoke
websocketmanager methods through the Virtufin API gateway
using the gateway's dynamic-dispatch pattern:
from virtufin.api.client import ApiClient
async with ApiClient() as client:
wsm = client.gateway.websocketmanager
# List — no request fields; returns {"connections": [...]}
connections = await wsm.List()
for c in connections["connections"]:
print(f" {c['id']}: {c['url']} ({c['status']})")
# Connect — url, auto_reconnect
conn = await wsm.Connect({
"url": "wss://stream.example.com/feed",
"auto_reconnect": True,
})
connection_id = conn["id"]
# Disconnect — id field
await wsm.Disconnect({"id": connection_id})
The gateway pattern is useful when: - The websocketmanager's gRPC port is not directly reachable (e.g., behind a service mesh, or only the gateway is exposed) - You want service discovery without configuring each backend's address explicitly - You're writing a tool that needs to talk to any of multiple backend services (websocketmanager, workmanager, custom) through one entry point
See the virtufin-api Python client docs for the full gateway client reference, and the proto-to-client-mapping spec §Layer 4 for the dynamic-dispatch call chain (gRPC reflection, JSON marshaling).
Complete Example
#!/usr/bin/env python3
"""Example: WebSocket management with the Python client."""
import asyncio
from virtufin.websocketmanager import WebSocketManagerClient
async def main():
async with WebSocketManagerClient("localhost", 5002) as client:
# Connect
print("Connecting to WebSocket server...")
connect_resp = await client.connect(
"wss://echo.websocket.org",
auto_reconnect=False
)
print(f"Connected: {connect_resp.id} - {connect_resp.status}")
# List connections
print("\nAll connections:")
list_resp = await client.list()
for conn in list_resp.connections:
print(f" {conn.id}: {conn.url} [{conn.status}]")
if conn.topic:
print(f" Publishing to: {conn.topic}")
# Send a message
print("\nSending message...")
send_resp = await client.send(
connect_resp.id,
'{"type": "request", "id": "123", "data": "hello"}',
timeout_ms=5000
)
print(f"Response: {send_resp.response}")
# Start pub/sub
print("\nStarting pub/sub...")
await client.start_publish(connect_resp.id, "ws-events")
print("Publishing started")
# Stop pub/sub
print("\nStopping pub/sub...")
await client.stop_publish(connect_resp.id)
print("Publishing stopped")
# Disconnect
print("\nDisconnecting...")
await client.disconnect(connect_resp.id)
print("Disconnected")
if __name__ == "__main__":
asyncio.run(main())
Error Handling
import grpc
try:
await client.connect("wss://invalid-host")
except grpc.aio.AioRpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
Common gRPC status codes:
- StatusCode.NOT_FOUND - Connection not found
- StatusCode.INVALID_ARGUMENT - Invalid URL or parameters
- StatusCode.DEADLINE_EXCEEDED - Send timed out
- StatusCode.UNAVAILABLE - Server not reachable