Next Post
Adding real-time communication between clients with SignalR – Step 2/2
Adding client-side streaming support
View PostIf you’ve followed the previous articles in the series you have a good foundation for a client application with draw tools and tactical symbols. We will now expand this to support a scenario where several clients can collaborate by sending feature updates in real-time over a WebSocket connection using the popular library SignalR.
This is a two-part article where we begin by writing the streaming server and finish in the second part by connecting the WPF client to the server.
This archive contains the solution outlined in this and the next article.
Requirements
To have multiple clients share information we need to add a new component that will be responsible for receiving and sending information between them. To achieve this we will create a small server application that will mediate between the clients. Each client that will participate in the information sharing will have to connect to the server and synchronize to receive the latest information and then subscribe to the relevant streams to start receiving information from the other clients. The scenario could be visualized with the image below:
On the client side, we will implement a background service that will continuously be listening to the different streams – insert, update and delete, and also send local updates to the server. Any processing such as serialization or deserialization will happen in the service without blocking the UI.
The server, on the other hand, will act as the source of truth, managing the shared state of the connected clients. It will consist of a thread-safe lookup mechanism with observables that will signal the insert, update and delete events that will be exposed to the clients.
Start by adding a new project to the existing solution by right-clicking the solution in Visual Studio and clicking Add > New Project… Then find the template called ASP.NET Core Empty, select that and click Next. Name it StreamingService and click Next again. In the next dialog, un-check the Enable Docker option for now and then click Create. This will set us up with a very bare-bones web application that we can extend.
Next, add the following NuGet packages to the project:
We will create a simple class for storing our features, backed by a thread-safe dictionary for fast lookup. It will also expose IObservable properties which will be used to push updates to consumers. Add the following code to a class called StreamingRepository
in the project root:
using System.Collections.Concurrent; using System.Reactive.Subjects; namespace StreamingService; public class StreamingRepositorywhere TKey : notnull { private readonly ConcurrentDictionary _store = new(); private readonly Subject<(TKey key, TValue value)> _addedSubject = new(); private readonly Subject<(TKey key, TValue value)> _updatedSubject = new(); private readonly Subject _removedSubject = new(); public IObservable<(TKey key, TValue value)> AddedStream => _addedSubject; public IObservable<(TKey key, TValue value)> UpdateStream => _updatedSubject; public IObservable RemovedStream => _removedSubject; public void Add(TKey key, TValue value) { _store[key] = value; _addedSubject.OnNext((key, value)); } public void Update(TKey key, TValue value) { if (!_store.ContainsKey(key)) return; _store[key] = value; _updatedSubject.OnNext((key, value)); } public void Remove(TKey key) { if (_store.TryRemove(key, out _)) _removedSubject.OnNext(key); } public IEnumerable<(TKey, TValue value)> Objects => _store.Select(x => (x.Key, x.Value)); }
The generic StreamingRepository
can hold any key-value pair combinations and upon adding, updating or removing entries, the key-value pair will be added to their respective observable streams by calling the OnNext
methods on the Subject
fields. The _addedSubject
and _updatedSubject
fields expose observable streams with the (TKey, TValue)
signature since each client connected to the server needs to receive the new values to update the local state accordingly. Removing values is a bit simpler since we just have to remove the value corresponding to the key on the RemovedStream
and we can reduce the data transfer by not sending the value over the wire to the clients.
With the StreamingRepository
in place, let’s create the client-facing part of the service by adding a SignalR Hub
implementation to the root of the StreamingService
project:
using Microsoft.AspNetCore.SignalR; using StreamingService.Utils; using System.Threading.Channels; using SerializedFeature = byte[]; namespace StreamingService; public class CommunicationHub(StreamingRepositoryrepository) : Hub { public void UpdateFeature(Guid id, SerializedFeature value) => repository.Update(id, value); public void AddFeature(Guid id, SerializedFeature value) => repository.Add(id, value); public void RemoveFeature(Guid id) => repository.Remove(id); public ChannelReader<(Guid Id, SerializedFeature Buffer)> AddedStream() => repository.AddedStream.AsChannelReader(); public ChannelReader<(Guid, SerializedFeature)> UpdateStream() => repository.UpdateStream.AsChannelReader(); public ChannelReader RemoveStream() => repository.RemovedStream.AsChannelReader(); public List<(Guid id, SerializedFeature Buffer)> AllFeatures() => repository.Objects.ToList(); }
By inheriting from the Microsoft.AspNetCore.SignalR.Hub
base class we will expose the public methods to the outside world so that other processes can call them using different protocols (WebSockets by default), and also subscribe to event streams exposed by the methods with the generic ChannelReader
return type. We see that the constructor takes a parameter with the type StreamingRepository<Guid, byte[]>
corresponding to the repository where our features will be stored. The Hub essentially wraps the repository methods and exposes them to remote clients.
Note how we’ve included the type alias
using SerializedFeature = byte[];
to clarify the intended value of the key-value-pairs in the repository.
To convert the repository’s IObservable
properties to ChannelReader
s for the Hub to expose, add a new static class:
using System.Threading.Channels; namespace StreamingService.Utils; public static class Extensions { public static ChannelReaderAsChannelReader (this IObservable observable) { var channel = Channel.CreateUnbounded (); IDisposable disposable = observable.Subscribe( value => channel.Writer.TryWrite(value), error => channel.Writer.TryComplete(error), () => channel.Writer.TryComplete()); channel.Reader.Completion.ContinueWith(_ => disposable.Dispose()); return channel.Reader; } }
The AsChannelReader
extension method will convert the IObservable
stream into a ChannelReader
of the same type by creating an unbounded (no length limit) Channel
read-write pair and returning the reader part while any added values will be added to the channel’s writer part by calling channel.Writer.TryWrite
in the Subscribe
method 1. Exposing a ChannelReader
in a hub is one way of achieving long-lived streams from a server application to one or several clients.
To make everything come together we need to add the necessary dependencies and set up our CommunicationHub
by mapping it to a public endpoint. Let’s start by adding our services to Program.cs
. The file should contain the following:
using StreamingService; WebApplicationBuilder builder = WebApplication.CreateBuilder(args); builder.Services.AddSignalR().AddMessagePackProtocol(); builder.Services.AddSingleton>(); WebApplication app = builder.Build(); app.UseHttpsRedirection(); app.MapHub ("/communicationHub"); app.Run();
This will register the necessary dependencies for SignalR and configure SignalR to support binary serialization using MessagePack. We also add the streaming repository with a mapping from Guid
to byte[]
, where the Guid will represent a globally unique identifier for each feature, and the byte array represents the serialized feature itself. Finally, before the call to app.Run()
, we expose the CommunicationHub
to the public endpoint /communicationHub
. And with that, the server component is finished.
In this first part of the final article in our WPF plus MVVM tutorial series, we have coded a server for streaming and synchronizing arbitrary GUID-bytearray pairs between multiple clients. The server leverages SignalR to manage connections and pushing changes between clients.
The solution to this tutorial also includes the launch settings needed to run the streaming server as a Docker container directly from Visual Studio, so have a look if you’re interested in learning more about that.
Continue with the next article in the series to see how we will implement the client-side support and connect to the server.
Adding client-side streaming support
View Post