Skip to content

Services

This document provides detailed documentation for all service classes in the WebSocket Manager.


IWebSocketService / WebSocketService

Namespace: Virtufin.WebSocketManager.Services

Purpose: Core service for managing WebSocket connections. Provides high-level operations for connecting, disconnecting, listing, and sending messages through WebSocket connections.

Interface Definition

public interface IWebSocketService
{
    Task<(WebSocketConnection? Connection, string? Error)> ConnectAsync(
        string url, bool autoReconnect, CancellationToken cancellationToken = default);

    Task<IEnumerable<WebSocketConnection>> ListConnectionsAsync(
        CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> DisconnectAsync(
        string id, CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> StartPublishAsync(
        string id, string topic, CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> StopPublishAsync(
        string id, CancellationToken cancellationToken = default);

    Task<(byte[]? Response, string? Error)> SendAsync(
        string id, byte[] message, string contentType, int timeoutMs, CancellationToken cancellationToken = default);

    Task<(bool Success, string? Error)> SendRawAsync(
        string id, byte[] message, string contentType, CancellationToken cancellationToken = default);
}

ConnectAsync

Establishes a new WebSocket connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | url | string | The WebSocket server URL (must be non-empty) | | autoReconnect | bool | Whether to enable auto-reconnect on failure | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple containing the created WebSocketConnection if successful, or null with an error message if unsuccessful.

Behavior: 1. Validates that url is non-empty 2. Creates a new connection entry via IWebSocketConnectionStore 3. Connects via IWebSocketClientWrapper 4. Starts the receive loop to handle incoming messages 5. If publishing is configured, publishes messages to the topic via IDaprPublisher 6. Persists the connection to the store 7. On failure, removes the connection from the store and returns the error


ListConnectionsAsync

Retrieves all active WebSocket connections.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | cancellationToken | CancellationToken | Optional cancellation token |

Returns: An enumerable collection of all WebSocketConnection instances.


DisconnectAsync

Disconnects and removes a WebSocket connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID to disconnect | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.

Behavior: 1. Retrieves the connection from the store 2. Validates ownership via CheckOwnership 3. Disconnects via IWebSocketClientWrapper (errors are logged but not thrown) 4. Removes the connection from the store


StartPublishAsync

Configures a connection to publish messages to a Dapr pub/sub topic.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | topic | string | The Dapr pub/sub topic name (must be non-empty) | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.

Errors: - "Connection not found" - Connection ID does not exist - Ownership errors from CheckOwnership - "WebSocket is not connected" - Connection status is not Connected - "Topic is required" - Topic parameter is empty


StopPublishAsync

Stops publishing messages for a connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.

Behavior: Sets connection.Topic to null and updates the store.


SendAsync

Sends a message and waits for a correlated response.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | message | string | The message to send | | timeoutMs | int | Timeout in milliseconds to wait for response | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple containing the response message if successful, or null with an error message.

Errors: - "Connection not found" - Ownership errors - "WebSocket is not connected" - "Request timeout" - Timeout exceeded

Implementation Detail: The message is wrapped with a correlation ID and sent via IWebSocketClientWrapper.SendAndWaitAsync. Responses are matched by correlation ID.

Remote-server contract (required): SendAsync only returns a response if the remote server cooperates with the correlation protocol. Specifically:

  • The wrapper constructs a WrappedMessage envelope:
    { "id": "<guid>", "payload": "<original message>" }
    
    and sends it over the WebSocket.
  • The remote server must parse this envelope and echo back a JSON object whose top-level "id" field is the same correlation GUID:
    { "id": "<same guid>", "payload": "<response>" }
    
  • The receive loop scans every incoming text/binary frame, extracts the "id" field (text JSON) or the 16-byte UUID prefix (binary), and matches it against pending requests.
  • Servers that don't echo the correlation ID will trigger a "Request timeout" after timeoutMs regardless of whether they actually sent a response.

This is the default correlation contract for text/* and application/json content types. Binary-frame correlation uses a 16-byte UUID binary prefix instead of a JSON envelope — see SendAndWaitAsync below for the binary variant.


SendRawAsync

Sends a raw message without waiting for a response.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | message | string | The message to send | | cancellationToken | CancellationToken | Optional cancellation token |

Returns: A tuple indicating success and an error message if unsuccessful.


CheckOwnership (Private)

Validates that the current instance owns the connection.

private bool CheckOwnership(WebSocketConnection connection, out string? error)
{
    error = null;
    if (connection.InstanceId != _instanceIdProvider.GetInstanceId())
    {
        error = $"Connection owned by different instance: {connection.InstanceId}";
        return false;
    }
    return true;
}

Purpose: Prevents cross-instance modifications in distributed deployments.


IWebSocketConnectionStore / DistributedWebSocketConnectionStore

Namespace: Virtufin.WebSocketManager.Services

Purpose: Manages WebSocket connection storage with distributed persistence via Dapr state store.

Interface Definition

public interface IWebSocketConnectionStore
{
    Task<WebSocketConnection> CreateConnectionAsync(
        string url, bool autoReconnect, CancellationToken cancellationToken = default);

    Task<WebSocketConnection?> GetConnectionAsync(
        string id, CancellationToken cancellationToken = default);

    Task<IEnumerable<WebSocketConnection>> GetAllConnectionsAsync(
        CancellationToken cancellationToken = default);

    Task UpdateConnectionAsync(
        WebSocketConnection connection, CancellationToken cancellationToken = default);

    Task RemoveConnectionAsync(
        string id, CancellationToken cancellationToken = default);

    Task ClearAllConnectionsAsync(
        CancellationToken cancellationToken = default);
}

Implementation: DistributedWebSocketConnectionStore

Uses a two-tier storage approach: - Local cache: ConcurrentDictionary<string, WebSocketConnection> for fast in-memory access - Dapr state store: Via IDaprConnectionRepository for distributed persistence

CreateConnectionAsync

Creates a new connection with a generated ID and the current instance ID.

Returns: A new WebSocketConnection with: - Id = Guid.NewGuid().ToString() - Url = provided URL - AutoReconnect = provided value - Status = ConnectionStatus.Disconnected - CreatedAt = DateTime.UtcNow - InstanceId = from IInstanceIdProvider.GetInstanceId()

The connection is added to the local cache but NOT persisted until explicitly updated.


GetConnectionAsync

Retrieves a connection by ID.

Behavior: 1. First checks local cache (_localConnections) 2. If not found, retrieves from Dapr state store via IDaprConnectionRepository

Returns: The connection if found, otherwise null.


GetAllConnectionsAsync

Retrieves all connections from both local cache and Dapr state store.

Returns: Combined list of remote and local connections.

Merge Logic: Remote connections (from Dapr state store) are returned first. Local connections not present in the remote set are then appended. If a connection exists in both stores (e.g., updated locally but not yet persisted), the remote version takes precedence.

Deduplication key: WebSocketConnection.Id (the connection ID string assigned at Connect time). Two entries are considered the same connection iff local.Id == remote.Id. The merge never produces duplicates for the same Id; when both sides have an entry for the same Id, the remote entry wins. Note: the local cache always reflects the most recent in-memory state, but the remote store is the source of truth for distributed queries — so a connection updated locally but not yet persisted may appear with stale data in the remote entry until the next UpdateConnectionAsync call.


UpdateConnectionAsync

Updates both local cache and Dapr state store.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to update |

Behavior: 1. Updates local cache 2. Converts to DaprStateStoreEntry and saves via repository


RemoveConnectionAsync

Removes a connection from both local cache and Dapr state store.


ClearAllConnectionsAsync

Removes all connections from both storage tiers.


IDaprConnectionRepository / DaprConnectionRepository

Namespace: Virtufin.WebSocketManager.Services

Purpose: Handles Dapr state store operations for WebSocket connections.

Interface Definition

public interface IDaprConnectionRepository
{
    Task<IEnumerable<DaprStateStoreEntry>> GetAllAsync(
        CancellationToken cancellationToken = default);

    Task<DaprStateStoreEntry?> GetAsync(
        string id, CancellationToken cancellationToken = default);

    Task SaveAsync(
        DaprStateStoreEntry entry, CancellationToken cancellationToken = default);

    Task DeleteAsync(
        string id, CancellationToken cancellationToken = default);
}

Implementation: DaprConnectionRepository

Uses Dapr state store with an index pattern for efficient lookups.

State Store Structure

  • Connection entries: Key = websocket-{id}, Value = DaprStateStoreEntry
  • Index: Key = websocket-index, Value = HashSet<string> containing all connection IDs

GetAllAsync

Retrieves all connections via the index: 1. Gets the index (HashSet<string>) from state store 2. If index is null or empty, returns empty enumeration 3. Constructs bulk state keys and retrieves all entries in one call 4. Returns only non-null entries


GetAsync

Retrieves a single connection by ID.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID |

Returns: DaprStateStoreEntry if found, otherwise null.


SaveAsync

Saves a connection entry and updates the index.

Behavior: 1. Saves the entry with key websocket-{entry.Id} 2. Calls UpdateIndexAsync with add: true


DeleteAsync

Deletes a connection entry and updates the index.

Behavior: 1. Deletes the entry with key websocket-{id} 2. Calls UpdateIndexAsync with add: false


UpdateIndexAsync (Private)

Updates the connection index in state store.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | id | string | The connection ID | | add | bool | true to add to index, false to remove |

Behavior: 1. Gets current index (or creates new HashSet<string> if null) 2. Adds or removes the ID 3. Saves the updated index


IWebSocketClientWrapper / WebSocketClientWrapper

Namespace: Virtufin.WebSocketManager.Services

Purpose: Provides low-level WebSocket connection operations. Manages external WebSocket connections including connection lifecycle, message sending/receiving, and auto-reconnect logic.

Interface Definition

public interface IWebSocketClientWrapper
{
    Task ConnectAsync(WebSocketConnection connection);

    Task DisconnectAsync(WebSocketConnection connection);

    Task SendAsync(WebSocketConnection connection, byte[] message, string contentType);

    Task<byte[]> SendAndWaitAsync(
        WebSocketConnection connection, byte[] message, int timeoutMs, string contentType);

    void StartReceiveLoop(
        WebSocketConnection connection, Func<byte[], string, Task> onMessageReceived);
}

Implementation: WebSocketClientWrapper

Manages System.Net.WebSockets.ClientWebSocket instances with a 16-byte UUID binary prefix for request-response correlation.

Internal State

  • _pendingRequests: ConcurrentDictionary<Guid, TaskCompletionSource<byte[]>> - Maps correlation UUIDs to pending response completers
  • Correlation envelope: [16 bytes UUID][payload bytes] — No JSON parsing, fast binary matching

ConnectAsync

Establishes a WebSocket connection.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to connect |

Behavior: 1. Creates a new CancellationTokenSource and assigns to connection.CancellationTokenSource 2. Creates a new ClientWebSocket and assigns to connection.WebSocket 3. Connects to connection.Url 4. Sets connection.Status to Connected 5. On failure, sets status to Disconnected and re-throws

Throws: Exception if connection fails.


DisconnectAsync

Gracefully closes a WebSocket connection.

Behavior: 1. Cancels the CancellationTokenSource 2. If socket is open, sends close frame with NormalClosure 3. Disposes the WebSocket 4. Sets status to Disconnected


SendAsync

Sends bytes via WebSocket. Frame type (Text/Binary) is determined by the contentType parameter: - text/* or application/json → Text frame - Everything else → Binary frame

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to send on | | message | byte[] | The raw bytes to send | | contentType | string | MIME content type (e.g., "text/plain", "application/json", "application/octet-stream") |

Throws: InvalidOperationException if WebSocket is not connected.


SendAndWaitAsync

Sends bytes with a 16-byte UUID binary prefix for correlation, waits for a matching response (first 16 bytes match the UUID prefix, stripped from the response).

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to send on | | message | byte[] | The message to send (min 16 bytes) | | timeoutMs | int | Timeout in milliseconds | | contentType | string | MIME content type |

Behavior: 1. Generates a new correlation UUID 2. Prepends 16-byte UUID to the message: [16 bytes UUID][payload bytes] 3. Sends the wrapped bytes via SendAsync with the given contentType 4. Waits for a response whose first 16 bytes match the UUID 5. Returns the response with the 16-byte UUID prefix stripped 2. Creates a TaskCompletionSource<string> for the response 3. Wraps message as WrappedMessage(correlationId, message) 4. Sends via SendAsync 5. Registers a timeout cancellation 6. Returns when response is received or timeout

Implementation Detail: The message is wrapped with a 16-byte GUID prefix. Responses are matched in StartReceiveLoop by comparing the first 16 bytes of received messages against pending correlation UUIDs.


StartReceiveLoop

Starts a background task to receive messages.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to receive from | | onMessageReceived | Func<byte[], string, Task> | Callback for received messages (raw bytes + auto-detected ContentType) |

Behavior: 1. Cancels any existing receive operation 2. Starts a new Task running the receive loop 3. Assigns to connection.ReceiveTask

Receive Loop: - Uses 8192-byte buffer - Accumulates messages until EndOfMessage is true - Extracts correlation ID from JSON messages - If correlation ID matches a pending request, completes that request - Otherwise, invokes onMessageReceived callback

Error Handling: - OperationCanceledException: Silently exits (expected on disconnect) - Other exceptions: If connection.AutoReconnect is true, attempts reconnection; otherwise marks as Disconnected


TryReconnectAsync (Private)

Attempts to reconnect with exponential backoff.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | connection | WebSocketConnection | The connection to reconnect | | onMessageReceived | Func<string, Task> | Callback for received messages |

Behavior: 1. Retries up to WebSocketOptions.ReconnectMaxAttempts times 2. Delay doubles each attempt: baseDelay * 2^(attempt-1), capped at 30 seconds 3. On successful reconnect, restarts the receive loop 4. On all failures, sets status to Disconnected


IDaprPublisher / DaprPublisher

Namespace: Virtufin.WebSocketManager.Services

Purpose: Publishes WebSocket messages to Dapr pub/sub topics.

Interface Definition

public interface IDaprPublisher
{
    Task PublishAsync(
        string topic, string websocketId, string websocketUrl, byte[] payload, string? contentType = null);
}

Implementation: DaprPublisher

PublishAsync

Publishes a WebSocket message directly to a Dapr pub/sub topic. Metadata (websocketId, websocketUrl, timestamp, contentType) is sent as CloudEvent extensions rather than wrapped inside the data payload.

Parameters: | Parameter | Type | Description | |-----------|------|-------------| | topic | string | The topic name to publish to | | websocketId | string | The ID of the WebSocket connection | | websocketUrl | string | The URL of the WebSocket connection | | payload | byte[] | The raw WebSocket message bytes | | contentType | string? | Optional content type from the WebSocket frame |

Behavior: 1. Decodes the payload bytes as UTF-8 text 2. Passes the text directly to DaprClient.PublishEventAsync<string> with the configured pub/sub component 3. Metadata fields (websocketId, websocketUrl, timestamp, contentType) are attached as CloudEvent extensions (passed via Dapr metadata dictionary) rather than wrapped inside the data payload

Dapr CloudEvent format (as stored in Redis):

{
  "data": "{\"stream\":\"btcusdt@depth@100ms\",\"data\":{\"e\":\"depthUpdate\",...}}",
  "datacontenttype": "application/json",
  "websocketId": "352eb904-...",
  "websocketUrl": "wss://stream.binance.com:9443/stream",
  "timestamp": "2026-06-12T18:48:16.939543Z",
  "contentType": "text/plain",
  "type": "com.dapr.event.sent"
}
The data field contains the raw WebSocket message as a JSON string (no base64 encoding). Consumers read data directly and JSON-parse it.

Throws: Exception if publishing fails.


IInstanceIdProvider / DefaultInstanceIdProvider

Namespace: Virtufin.WebSocketManager.Services

Purpose: Provides a unique identifier for the current service instance.

Interface Definition

public interface IInstanceIdProvider
{
    string GetInstanceId();
}

Implementation: DefaultInstanceIdProvider

Returns the pod name from the HOSTNAME environment variable (set by Kubernetes). Falls back to a random GUID if HOSTNAME is not set.

Connection Ownership: Connections are tied to the instance that created them. Only the owning instance can disconnect or send messages on a connection. During rolling updates, a new pod receives a new HOSTNAME. The ConnectionReclaimerHostedService runs every 30 seconds and reclaims connections owned by instances that are no longer present. Set CLEAR_ALL_ON_START=TRUE to clear all connections from the state store on startup instead of reclaiming reconnected state.

Use Case: Used by WebSocketService to validate connection ownership and prevent cross-instance modifications.