Adding real-time communication between clients with SignalR – Step 2/2

Step 2/2 – Client-side development

Note: This is a two-part article where we began by writing the streaming server and finish in this second part by connecting the WPF client to the server. If you haven’t finished the first part yet, please do so now.

This archive contains the solution outlined in this and the previous article.

Adding client-side streaming support

With the server-side implementation in place, we need to implement the client-side handling as well. We will do this by creating a new component that will run as a background service and continuously listen for server-side events on the streams exposed by the CommunicationHub, as well as sending updates to the server when the local state is modified.

To achieve this we will have to make a few additional changes and additions to our client application; we need to create a serializer for our features to a binary format, we need to add support for background services and add the logic necessary to connect and communicate with the CommunicationHub.

Adding support for long-running background services

There are many ways of adding background services to your application, the simplest way probably being just having a fire-and-forget Task running along the application lifetime. But this does, however, come with a few problems, such as difficulties with error and exception handling, lifetime management, and dependency management just to name a few. A better option is to use the Generic Host from the framework which supports many nice features in addition to background services.

To get started, add the Microsoft.Extensions.Hosting NuGet package to the client project and making the following changes to App.xaml.cs:

App.xaml.cs

...
using Microsoft.Extensions.Hosting;
...

namespace CarmentaWpfClient
{
    /// <summary>
    /// Interaction logic for App.xaml
    /// </summary>
    public partial class App : Application
    {
        private readonly IHost _host;

        public App()
        {
            Runtime.Initialize();

            _host = new HostBuilder()
                .ConfigureServices((_, services) => ConfigureServices(services))
                .Build();

            Services = _host.Services;
        }

        ...

        private static ServiceProvider ConfigureServices(IServiceCollection services)
        {
            // Remove the construction of `services`
            ...
        }

        protected override async void OnStartup(StartupEventArgs e)
        {
            await _host.StartAsync();
            base.OnStartup(e);
        }

        protected override async void OnExit(ExitEventArgs e)
        {
            using (_host)
            {
                await _host.StopAsync();
            }
            base.OnExit(e);
        }
    }
}

In the constructor, the generic host is configured using the builder pattern. Note that we do not care about the parameter, so we’ve used a discard pattern to make this clear.

We also made a slight change by injecting the service collection from the host into the ConfigureServices method to make sure we gather all dependencies in one place. Finally, the OnStartup and OnExit method overrides make sure the host is started and stopped gracefully.

Creating a binary feature serializer

Let’s create a serializer that we can use to serialize and deserialize our features upon sending and receiving them over the wire. Create an interface called IFeatureSerializer in Utils\IFeatureSerializer.cs:

Utils\IFeatureSerializer.cs

using Carmenta.Engine;

namespace CarmentaWpfClient.Utils;

public interface IFeatureSerializer
{
    byte[] Serialize(Feature feature);

    Feature Deserialize(byte[] buffer);
}

The definition is simple – we want to serialize a Feature to a byte array and vice-versa when deserializing, therefore we pass a Feature as a parameter and expect a byte array from the Serialize method, and vice-versa for the Deserialize method. Create the implementation in Utils\BinaryFeatureSerializer.cs:

Utils\BinaryFeatureSerializer.cs

using Carmenta.Engine;
using System;
using System.IO;

namespace CarmentaWpfClient.Utils;

public class BinaryFeatureSerializer : IFeatureSerializer
{
    public Feature Deserialize(byte[] buffer)
    {
        using var stream = new MemoryStream(buffer);
        stream.Seek(0, SeekOrigin.Begin);

        return Feature.FromStream(stream);
    }

    public byte[] Serialize(Feature feature)
    {
        using Stream? stream = feature.Save(FeatureSaveMode.XYZ);

        var buffer = new byte[stream.Length];
        stream.Read(buffer, 0, buffer.Length);

        return buffer;
    }
}

The definition is simple – we want to serialize a Feature to a byte array and vice-versa when deserializing, therefore we pass a Feature as a parameter and expect a byte array from the Serialize method, and vice-versa for the Deserialize method. Create the implementation in Utils\BinaryFeatureSerializer.cs:

Utils\BinaryFeatureSerializer.cs

using Carmenta.Engine;
using System;
using System.IO;

namespace CarmentaWpfClient.Utils;

public class BinaryFeatureSerializer : IFeatureSerializer
{
    public Feature Deserialize(byte[] buffer)
    {
        using var stream = new MemoryStream(buffer);
        stream.Seek(0, SeekOrigin.Begin);

        return Feature.FromStream(stream);
    }

    public byte[] Serialize(Feature feature)
    {
        using Stream? stream = feature.Save(FeatureSaveMode.XYZ);

        var buffer = new byte[stream.Length];
        stream.Read(buffer, 0, buffer.Length);

        return buffer;
    }
}

The implementation uses the Feature.Save instance method to save the feature to a stream representation which can be written to a buffer in the form of a byte array. For deserializing a buffer to a feature we can use the static Feature.FromStream method by creating a MemoryStream of the buffer and inputting the stream to the Feature.FromStream static method.

With that in place, add the serializer to the dependencies by adding the following line to the ConfigureServices method in App.xaml.cs:

App.xaml.cs

private static ServiceProvider ConfigureServices(IServiceCollection services)
{
    ...
    services.AddTransient<IFeatureSerializer, BinaryFeatureSerializer>();
    ...
}

Creating the stream communicator

Remember, we wanted to create the service responsible for server communication as a long-running background service, we can do this by inheriting from either BackgroundService or by implementing the IHostedService interface from Microsoft.Extensions.Hosting. In this case, we will inherit from BackgroundService. Create the class StreamCommunicator in Utils\StreamCommunicator.cs:

Utils\StreamCommunicator.cs

using Carmenta.Engine;
using CarmentaWpfClient.Messages;
using CommunityToolkit.Mvvm.Messaging;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace CarmentaWpfClient.Utils;

public class StreamCommunicator : BackgroundService
{
    private readonly MemoryDataSet _memoryDataSet;
    private readonly IFeatureSerializer _featureSerializer;
    private readonly HubConnection _hubConnection;

    private readonly ConcurrentDictionary<Guid, Id> _idMap = new();
    private readonly ConcurrentDictionary<Id, Guid> _reverseIdMap = new();

    public StreamCommunicator(MemoryDataSet dataSet, IFeatureSerializer featureSerializer)
    {
        _featureSerializer = featureSerializer;
        _memoryDataSet = dataSet;

        _memoryDataSet.EnableEvents = true;
        _memoryDataSet.FeatureInserted += FeatureInserted;
        _memoryDataSet.FeatureChanged += FeatureChanged;
        _memoryDataSet.FeatureRemoved += FeatureRemoved;

        _hubConnection = new HubConnectionBuilder()
            .WithUrl("https://localhost:5001/communicationHub")
            .WithAutomaticReconnect()
            .AddMessagePackProtocol()
            .Build();

        _hubConnection.Closed += OnHubConnectionClosed;
        _hubConnection.Reconnected += OnHubReconnected;
    }

    private Task OnHubReconnected(string? arg)
    {
        IsConnected = true;
        return Task.CompletedTask;
    }

    private Task OnHubConnectionClosed(Exception? _)
    {
        IsConnected = false;

        return Task.CompletedTask;
    }

    public bool IsConnected { get; private set; }

    public async Task InitializeAsync()
    {
        try
        {
            await _hubConnection.StartAsync();

            var existingFeatures = await _hubConnection.InvokeAsync<List<(Guid, byte[])>>("AllFeatures");

            foreach ((Guid id, byte[] buffer) in existingFeatures)
                HandleStreamAdded(id, buffer);

            await SetupStreamListeners();
        }
        catch(Exception)
        {
            //Log error etc..
        }
    }

    private Task SetupStreamListeners()
    {
        Task addedStreamTask = SetupAddedStream();
        Task editStreamTask = SetupStreamEdited();
        Task removeStreamTask = SetupRemoveStream();

        return Task.WhenAll(addedStreamTask, editStreamTask, removeStreamTask);
    }

    private async Task SetupAddedStream()
    {
        IAsyncEnumerable<(Guid Id, byte[] Buffer)> addedStream = _hubConnection.StreamAsync<(Guid Id, byte[] Buffer)>("AddedStream");

        await foreach ((Guid id, byte[] buffer) in addedStream)
            HandleStreamAdded(id, buffer);
    }

    private void HandleStreamAdded(Guid id, byte[] buffer)
    {
        if (_idMap.ContainsKey(id))
            return;

        using var guard = new Guard(_memoryDataSet);

        Feature feature = _featureSerializer.Deserialize(buffer);

        // Temporarily turn off events to avoid triggering another
        // inserted event which could cause recursive behaviour.
        _memoryDataSet.EnableEvents = false;
        Id featureId = _memoryDataSet.Insert(feature);
        _memoryDataSet.EnableEvents = true;

        _idMap[id] = featureId;
        _reverseIdMap[featureId] = id;

        WeakReferenceMessenger.Default.Send(new UpdateViewMessage());
    }

    private async Task SetupStreamEdited()
    {
        IAsyncEnumerable<(Guid Id, byte[] Buffer)> editedStream = _hubConnection.StreamAsync<(Guid Id, byte[] Buffer)>("UpdateStream");

        await foreach((Guid id, byte[] buffer) in editedStream)
            HandleStreamEdited(id, buffer);
    }

    private void HandleStreamEdited(Guid id, byte[] buffer)
    {
        if (!_idMap.ContainsKey(id))
        {
            HandleStreamAdded(id, buffer);
            return;
        }

        Feature feature = _featureSerializer.Deserialize(buffer);
        Id existingFeatureId = _idMap[id];

        // Optimize by replacing with dictionary lookup
        Feature? existingFeature = _memoryDataSet.GetFeature(existingFeatureId);

        UpdateFeature(existingFeature, feature.Geometry, feature.Attributes);
    }

    private async Task SetupRemoveStream()
    {
        IAsyncEnumerable<Guid> removeStream = _hubConnection.StreamAsync<Guid>("RemoveStream");

        await foreach(Guid id in removeStream)
        {
            if (!_idMap.ContainsKey(id))
                continue;

            _idMap.Remove(id, out Id idToRemove);
            _reverseIdMap.Remove(idToRemove, out _);
            using var guard = new Guard(_memoryDataSet);

            _memoryDataSet.EnableEvents = false;
            _memoryDataSet.Remove(idToRemove);
            _memoryDataSet.EnableEvents = true;
        }
    }

    private void UpdateFeature(Feature feature, Geometry geometry, AttributeSet attributes)
    {
        switch (geometry)
        {
            case PointGeometry point:
                feature.GetGeometryAsPoint().Point = point.Point;
                break;
            case LineGeometry line:
                {
                    LineGeometry? geom = feature.GetGeometryAsLine();
                    geom.Points.Clear();
                    geom.Points.AddRange(line.Points);
                    break;
                }
            case PolygonGeometry polygon:
                {
                    PolygonGeometry? geom = feature.GetGeometryAsPolygon();
                    geom.Rings.Clear();
                    geom.Rings.AddRange(polygon.Rings.Clone() as LineGeometryCollection);
                    break;
                }
            default:
                return;
        }

        feature.Attributes.Merge(attributes.Clone() as AttributeSet);

        _memoryDataSet.RefreshFeaturePresentation(feature, false);
        WeakReferenceMessenger.Default.Send(new UpdateViewMessage());
    }

    private async void FeatureChanged(object sender, MemoryDataSetEventArgs args)
    {
        Guid corrId = _reverseIdMap[args.Feature.Id];

        byte[] buffer = _featureSerializer.Serialize(args.Feature);
        await _hubConnection.InvokeAsync("UpdateFeature", corrId, buffer);
    }

    private async void FeatureInserted(object sender, MemoryDataSetEventArgs args)
    {
        Feature? feature = args.Feature;
        var corrId = Guid.NewGuid();

        _idMap[corrId] = feature.Id;
        _reverseIdMap[feature.Id] = corrId;

        byte[] buffer = _featureSerializer.Serialize(feature);
        await _hubConnection.InvokeAsync("AddFeature", corrId, buffer);
        WeakReferenceMessenger.Default.Send(new UpdateViewMessage());
    }

    private async void FeatureRemoved(object sender, MemoryDataSetEventArgs args)
    {
        Guid corrId = _reverseIdMap[args.Feature.Id];
        await _hubConnection.InvokeAsync("RemoveFeature", corrId);

        WeakReferenceMessenger.Default.Send(new UpdateViewMessage());
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        if (_hubConnection.State == HubConnectionState.Connected)
            await _hubConnection.StopAsync(cancellationToken);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await InitializeAsync();
    }
}

This is a rather lengthy piece of code since a lot is happening, so we will break down a few things of importance and see what happens in detail.

Let’s start by looking at the constructor; we inject a MemoryDataSet and an instance of the newly created IFeatureSerializer. The MemoryDataSet will hold our features in memory and the StreamCommunicator will subscribe to the events of the MemoryDataSet to handle sending updates to the server when features are changed, inserted or removed by handling the events for each action. This is done in the following lines in the constructor using the += syntax for adding event handlers in C#.

Next, we use the HubConnectionBuilder class to set up the HubConnection which manages the SignalR connection to the server. Note that the URL might be different in your environment. We also subscribe to the Closed and Reconnected events on the HubConnection so that we can notify our application if the connection changes.

The next important method to look into is InitializeAsync which is called from the base class ExecuteAsync method. ExecuteAsync will be called by the framework when the application starts once we’ve added the background service to the list of dependencies, and thus, InitializeAsync will be called as well. If we look into InitializeAsync, we see that the HubConnection is started and we retrieve the entire set of available features on the server by invoking the AllFeatures method on the CommunicationHub and consequently adding each feature in the result set by calling the HandleStreamAdded method. This is the step where we synchronize the state with all other clients before we start subscribing to events and send updates ourselves.

The HandleStreamAdded method is used in several places to add new features coming from the server when the unique id is known. If we look into the code snippet for HandleStreamAdded below, we see that the received feature is deserialized, then we turn off the events temporarily on the MemoryDataSet to avoid recursive behaviors when adding features internally. Next, we add a mapping between the internal feature id created by the MemoryDataSet and the external id, so that we can correlate between the two. This enables different client applications to apply their own internal schemes for feature identifiers. Lastly, we send a UpdateViewMessage to signal that the view should be updated to reflect the newly added feature.

Utils\StreamCommunicator.cs

private void HandleStreamAdded(Guid id, byte[] buffer)
{
    if (_idMap.ContainsKey(id))
        return;

    using var guard = new Guard(_memoryDataSet);

    Feature feature = _featureSerializer.Deserialize(buffer);

    // Temporarily turn off events to avoid triggering another
    // inserted event which could cause recursive behaviour.
    _memoryDataSet.EnableEvents = false;
    Id featureId = _memoryDataSet.Insert(feature);
    _memoryDataSet.EnableEvents = true;

    _idMap[id] = featureId;
    _reverseIdMap[featureId] = id;

    WeakReferenceMessenger.Default.Send(new UpdateViewMessage());
}

Once we’ve added the features from the server we will start subscribing to the server events to receive updates on features added, removed och updated. This is done by calling the respective setup method for the events, such as SetupAddedStream and awaiting them. Digging into the SetupAddedStream method a little bit we see that it’s quite simple; we call the HubConnection.StreamAsync instance method with the argument "AddedStream" to subscribe to that stream on the CommunicationHub. This method will return an IAsyncEnumerable which we can asynchronously await to receive incoming events. Once we receive a newly added id-feature pair we will once again handle it using the HandleStreamEdited method. The logic is similar for subscribing to the update stream and to remove streams.

Handling Feature Updates

There are two types of updates we have to consider – local feature updates that have to be published to the server, and server updates that have to be synchronized locally. Let’s start by looking at the first case.

Synchronizing local feature updates

To handle local feature updates, we subscribe to the MemoryDataSet.FeatureChanged event and handle updates in the FeatureChanged event handler.

Utils\StreamCommunicator.cs

private async void FeatureChanged(object sender, MemoryDataSetEventArgs args)
{
    Guid corrId = _reverseIdMap[args.Feature.Id];

    byte[] buffer = _featureSerializer.Serialize(args.Feature);
    await _hubConnection.InvokeAsync("UpdateFeature", corrId, buffer);
}

Looking at the FeatureChanged method we see it’s rather simple – we start by looking up the globally unique id in the _reverseIdMap field, we then serialize the feature and send it to the server by invoking the UpdateFeature method on the CommunicationHub.

Handling server updates

SetupStreamEdited handles subscribing to the UpdateStream on the communication hub by asynchronously awaiting updates and handling them in the HandleStreamEdited method. Looking at HandleStreamEdited we see that we’re first doing a check to see whether we already have a mapping with the incoming feature. If for whatever reason we don’t have that, we first have to add it. We do this by once again invoking the HandleStreamAdded method. If, however, we do have a mapping, we simply do a lookup and handle the actual updating in the UpdateFeature method:

Utils\StreamCommunicator.cs

private void UpdateFeature(Feature feature, Geometry geometry, AttributeSet attributes)
{
    switch (geometry)
    {
        case PointGeometry point:
            feature.GetGeometryAsPoint().Point = point.Point;
            break;
        case LineGeometry line:
            {
                LineGeometry? geom = feature.GetGeometryAsLine();
                geom.Points.Clear();
                geom.Points.AddRange(line.Points);
                break;
            }
        case PolygonGeometry polygon:
            {
                PolygonGeometry? geom = feature.GetGeometryAsPolygon();
                geom.Rings.Clear();
                geom.Rings.AddRange(polygon.Rings.Clone() as LineGeometryCollection);
                break;
            }
        default:
            return;
    }

    feature.Attributes.Merge(attributes.Clone() as AttributeSet);

    _memoryDataSet.RefreshFeaturePresentation(feature, false);
    WeakReferenceMessenger.Default.Send(new UpdateViewMessage());
}

The method uses a switch case to handle different geometries and update the local geometry accordingly. It also merges the existing set of attributes with the incoming AttributeSet.

For brevity, we will not cover the process for removing or handling removed features since it’s fairly similar to the previously explained cases. Please refer to the source code for details.

Adding the StreamCommunicator background service

Finally, we want to add the StreamCommunicator to the list of dependencies. We will do this by using the AddHostedService extension method for service registration.

Add the following lines to App.ConfigureServices:

App.xaml.cs

private static ServiceProvider ConfigureServices(IServiceCollection services)
{
    ...

    services.AddHostedService<StreamCommunicator>(x =>
    {
        if (!x.GetService<MapProvider>().TryGetMap("View0", out MapModel mapModel))
            throw new Exception("Could not load map configuration.");

        MemoryDataSet memoryDataSet = mapModel.GetMemoryDataSet("TacticalSymbolsDataSet") 
                                        ?? throw new Exception("Could not find TacticalSymbolsDataSet.");

        IFeatureSerializer serializer = x.GetService<IFeatureSerializer>()
                                        ?? throw new Exception("Could not get feature serializer implementation.");

        return new StreamCommunicator(memoryDataSet, serializer);
    });

    ...
}

We use the method overload with the factory method argument. With that, we can do a service lookup inside of the registration method to fetch the MemoryDataSet we want to inject into the StreamCommunicator. The same procedure goes for the IFeatureSerializer.

For this to work, we also need to add a way to get a MemoryDataSet from a MapModel:

Models\MapModel.cs

public partial class MapModel
{
    ...
    public MemoryDataSet? GetMemoryDataSet(string name) => View.GetTypedObject<MemoryDataSet>(name);
}

Testing the application

With everything in place, set the start application to the StartService project and then build and run the application. Then, locate the build directory for the client application and start two instances. Start adding tactical features in either instance of the client application and you should see how the two clients stay synchronized:

What’s more, is that if you close the client applications and start a new instance they will resume their previous state as well!

Tip: The solution to this article contains a launch profile definition which launches the streaming service first and then the client when starting a default debug session in Visual Studio. If using this, all you need to do after that is start a second instance of the client.

Conclusion

This concludes the WPF plus MVVM article series, where we’ve built a small .NET 8 application with progressively more complex functionality using an established architectural pattern and modern code constructs.

In this final part of the series, we’ve shown how to serialize Features to a binary format suitable for streaming. We’ve connected to a SignalR Hub and hooked up handlers for push events from the server.

We’ve also seen how to handle feature events from the local MemoryDataSet as well as disable events from the MemoryDataSet when interacting with its Features in the event handlers, to avoid infinite recursion.

Finally, we’ve had a look at a bit more involved dependency management using a long-lived HostedService.