Skip to content

Python Client Library

The Python client library provides an async gRPC interface to the WebSocketManager service.

Installation

From Gitea PyPI Registry

Set up authentication with a Gitea personal access token (scope: read:packages):

export PIP_USERNAME=<your-gitea-username>
export PIP_PASSWORD=<your-gitea-token>

pip install virtufin-websocketmanager \
  --index-url https://pypi.haenerconsulting.com/api/packages/virtufin/pypi/simple/ \
  --extra-index-url https://pypi.org/simple

From Local Source

pip install grpcio grpcio-reflection
export PYTHONPATH=/path/to/virtufin-websocketmanager/src/python

Quick Start

import asyncio
from virtufin.websocketmanager import WebSocketManagerClient

async def main():
    async with WebSocketManagerClient("localhost", 5002) as client:
        # Connect to a WebSocket server
        response = await client.connect("wss://echo.websocket.org", auto_reconnect=True)
        print(f"Connected: {response.id} - {response.status}")

        # List all connections
        list_resp = await client.list()
        for conn in list_resp.connections:
            print(f"  {conn.id}: {conn.url} [{conn.status}]")

        # Send a message and wait for response
        send_resp = await client.send(response.id, '{"type": "ping"}', timeout_ms=5000)
        print(f"Response: {send_resp.response}")

        # Disconnect
        await client.disconnect(response.id)

asyncio.run(main())

WebSocketManagerClient

The main client class for interacting with the WebSocketManager service.

Constructor

def __init__(self, host: str = "localhost", port: int = 5002)
Parameter Type Default Description
host str "localhost" gRPC server hostname
port int 5002 gRPC server port

Context Manager

async with WebSocketManagerClient() as client:
    # client is connected
    pass
# Automatically closed

Connection Operations

connect()

Establish a new WebSocket connection.

response: ConnectResponse = await client.connect(url: str, auto_reconnect: bool = False)
Parameter Type Description
url str WebSocket server URL (e.g., wss://example.com/socket)
auto_reconnect bool Enable automatic reconnection on disconnect

Returns: ConnectResponse with id and status fields.

Example:

response = await client.connect("wss://echo.websocket.org", auto_reconnect=True)
print(f"Connected: {response.id}")

list()

List all managed WebSocket connections.

response: ListResponse = await client.list()

Returns: ListResponse with connections list. Each connection has id, url, status, topic, instance_id.

Example:

resp = await client.list()
for conn in resp.connections:
    print(f"{conn.id}: {conn.url} [{conn.status}]")

disconnect()

Disconnect and remove a WebSocket connection.

await client.disconnect(id: str)
Parameter Type Description
id str Connection ID to disconnect

Example:

await client.disconnect(connection_id)

Messaging Operations

send()

Send a message and wait for a correlated response.

response: SendResponse = await client.send(id: str, message: str, timeout_ms: int = 5000)
Parameter Type Description
id str Connection ID
message str Message to send (JSON string)
timeout_ms int Timeout in milliseconds to wait for response

Returns: SendResponse with response field containing the server reply.

Example:

resp = await client.send(connection_id, '{"type": "request", "id": "123"}', timeout_ms=5000)
print(f"Response: {resp.response}")

send_raw()

Send a message without waiting for a response.

await client.send_raw(id: str, message: str)
Parameter Type Description
id str Connection ID
message str Message to send

Example:

await client.send_raw(connection_id, '{"type": "notification"}")

Pub/Sub Operations

start_publish()

Start publishing WebSocket messages to a Dapr pub/sub topic.

await client.start_publish(id: str, topic: str)
Parameter Type Description
id str Connection ID
topic str Dapr pub/sub topic name

Example:

await client.start_publish(connection_id, "websocket-events")

stop_publish()

Stop publishing WebSocket messages to the topic.

await client.stop_publish(id: str)
Parameter Type Description
id str Connection ID

Example:

await client.stop_publish(connection_id)

Via the API Gateway

The WebSocketManagerClient connects directly to the websocketmanager's gRPC port (5002 by default). Alternatively, you can invoke websocketmanager methods through the Virtufin API gateway using the gateway's dynamic-dispatch pattern:

from virtufin.api.client import ApiClient

async with ApiClient() as client:
    wsm = client.gateway.websocketmanager

    # List — no request fields; returns {"connections": [...]}
    connections = await wsm.List()
    for c in connections["connections"]:
        print(f"  {c['id']}: {c['url']} ({c['status']})")

    # Connect — url, auto_reconnect
    conn = await wsm.Connect({
        "url": "wss://stream.example.com/feed",
        "auto_reconnect": True,
    })
    connection_id = conn["id"]

    # Disconnect — id field
    await wsm.Disconnect({"id": connection_id})

The gateway pattern is useful when: - The websocketmanager's gRPC port is not directly reachable (e.g., behind a service mesh, or only the gateway is exposed) - You want service discovery without configuring each backend's address explicitly - You're writing a tool that needs to talk to any of multiple backend services (websocketmanager, workmanager, custom) through one entry point

See the virtufin-api Python client docs for the full gateway client reference, and the proto-to-client-mapping spec §Layer 4 for the dynamic-dispatch call chain (gRPC reflection, JSON marshaling).

Complete Example

#!/usr/bin/env python3
"""Example: WebSocket management with the Python client."""

import asyncio
from virtufin.websocketmanager import WebSocketManagerClient


async def main():
    async with WebSocketManagerClient("localhost", 5002) as client:
        # Connect
        print("Connecting to WebSocket server...")
        connect_resp = await client.connect(
            "wss://echo.websocket.org",
            auto_reconnect=False
        )
        print(f"Connected: {connect_resp.id} - {connect_resp.status}")

        # List connections
        print("\nAll connections:")
        list_resp = await client.list()
        for conn in list_resp.connections:
            print(f"  {conn.id}: {conn.url} [{conn.status}]")
            if conn.topic:
                print(f"    Publishing to: {conn.topic}")

        # Send a message
        print("\nSending message...")
        send_resp = await client.send(
            connect_resp.id,
            '{"type": "request", "id": "123", "data": "hello"}',
            timeout_ms=5000
        )
        print(f"Response: {send_resp.response}")

        # Start pub/sub
        print("\nStarting pub/sub...")
        await client.start_publish(connect_resp.id, "ws-events")
        print("Publishing started")

        # Stop pub/sub
        print("\nStopping pub/sub...")
        await client.stop_publish(connect_resp.id)
        print("Publishing stopped")

        # Disconnect
        print("\nDisconnecting...")
        await client.disconnect(connect_resp.id)
        print("Disconnected")


if __name__ == "__main__":
    asyncio.run(main())

Error Handling

import grpc

try:
    await client.connect("wss://invalid-host")
except grpc.aio.AioRpcError as e:
    print(f"gRPC error: {e.code()} - {e.details()}")

Common gRPC status codes: - StatusCode.NOT_FOUND - Connection not found - StatusCode.INVALID_ARGUMENT - Invalid URL or parameters - StatusCode.DEADLINE_EXCEEDED - Send timed out - StatusCode.UNAVAILABLE - Server not reachable

SDK Reference