Next Post
Getting started with Vertical Profiles
Learn how the VerticalProfileOperator in Carmenta Engine can be applied to provide new perspectives to strengthen situational awareness.
View PostNote: This is a two-part article where we began by writing the streaming server and finish in this second part by connecting the WPF client to the server. If you haven’t finished the first part yet, please do so now.
This archive contains the solution outlined in this and the previous article.
With the server-side implementation in place, we need to implement the client-side handling as well. We will do this by creating a new component that will run as a background service and continuously listen for server-side events on the streams exposed by the CommunicationHub
, as well as sending updates to the server when the local state is modified.
To achieve this we will have to make a few additional changes and additions to our client application; we need to create a serializer for our features to a binary format, we need to add support for background services and add the logic necessary to connect and communicate with the CommunicationHub
.
There are many ways of adding background services to your application, the simplest way probably being just having a fire-and-forget Task
running along the application lifetime. But this does, however, come with a few problems, such as difficulties with error and exception handling, lifetime management, and dependency management just to name a few. A better option is to use the Generic Host from the framework which supports many nice features in addition to background services.
To get started, add the Microsoft.Extensions.Hosting
NuGet package to the client project and making the following changes to App.xaml.cs
:
... using Microsoft.Extensions.Hosting; ... namespace CarmentaWpfClient { /// <summary> /// Interaction logic for App.xaml /// </summary> public partial class App : Application { private readonly IHost _host; public App() { Runtime.Initialize(); _host = new HostBuilder() .ConfigureServices((_, services) => ConfigureServices(services)) .Build(); Services = _host.Services; } ... private static ServiceProvider ConfigureServices(IServiceCollection services) { // Remove the construction of `services` ... } protected override async void OnStartup(StartupEventArgs e) { await _host.StartAsync(); base.OnStartup(e); } protected override async void OnExit(ExitEventArgs e) { using (_host) { await _host.StopAsync(); } base.OnExit(e); } } }
In the constructor, the generic host is configured using the builder pattern. Note that we do not care about the parameter, so we’ve used a discard pattern to make this clear.
We also made a slight change by injecting the service collection from the host into the ConfigureServices
method to make sure we gather all dependencies in one place. Finally, the OnStartup
and OnExit
method overrides make sure the host is started and stopped gracefully.
Let’s create a serializer that we can use to serialize and deserialize our features upon sending and receiving them over the wire. Create an interface called IFeatureSerializer
in Utils\IFeatureSerializer.cs
:
using Carmenta.Engine; namespace CarmentaWpfClient.Utils; public interface IFeatureSerializer { byte[] Serialize(Feature feature); Feature Deserialize(byte[] buffer); }
The definition is simple – we want to serialize a Feature
to a byte array and vice-versa when deserializing, therefore we pass a Feature
as a parameter and expect a byte array from the Serialize
method, and vice-versa for the Deserialize
method. Create the implementation in Utils\BinaryFeatureSerializer.cs
:
using Carmenta.Engine; using System; using System.IO; namespace CarmentaWpfClient.Utils; public class BinaryFeatureSerializer : IFeatureSerializer { public Feature Deserialize(byte[] buffer) { using var stream = new MemoryStream(buffer); stream.Seek(0, SeekOrigin.Begin); return Feature.FromStream(stream); } public byte[] Serialize(Feature feature) { using Stream? stream = feature.Save(FeatureSaveMode.XYZ); var buffer = new byte[stream.Length]; stream.Read(buffer, 0, buffer.Length); return buffer; } }
The definition is simple – we want to serialize a Feature
to a byte array and vice-versa when deserializing, therefore we pass a Feature
as a parameter and expect a byte array from the Serialize
method, and vice-versa for the Deserialize
method. Create the implementation in Utils\BinaryFeatureSerializer.cs
:
using Carmenta.Engine; using System; using System.IO; namespace CarmentaWpfClient.Utils; public class BinaryFeatureSerializer : IFeatureSerializer { public Feature Deserialize(byte[] buffer) { using var stream = new MemoryStream(buffer); stream.Seek(0, SeekOrigin.Begin); return Feature.FromStream(stream); } public byte[] Serialize(Feature feature) { using Stream? stream = feature.Save(FeatureSaveMode.XYZ); var buffer = new byte[stream.Length]; stream.Read(buffer, 0, buffer.Length); return buffer; } }
The implementation uses the Feature.Save
instance method to save the feature to a stream representation which can be written to a buffer in the form of a byte array. For deserializing a buffer to a feature we can use the static Feature.FromStream
method by creating a MemoryStream
of the buffer and inputting the stream to the Feature.FromStream
static method.
With that in place, add the serializer to the dependencies by adding the following line to the ConfigureServices
method in App.xaml.cs
:
private static ServiceProvider ConfigureServices(IServiceCollection services) { ... services.AddTransient<IFeatureSerializer, BinaryFeatureSerializer>(); ... }
Remember, we wanted to create the service responsible for server communication as a long-running background service, we can do this by inheriting from either BackgroundService
or by implementing the IHostedService
interface from Microsoft.Extensions.Hosting
. In this case, we will inherit from BackgroundService
. Create the class StreamCommunicator
in Utils\StreamCommunicator.cs
:
using Carmenta.Engine; using CarmentaWpfClient.Messages; using CommunityToolkit.Mvvm.Messaging; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace CarmentaWpfClient.Utils; public class StreamCommunicator : BackgroundService { private readonly MemoryDataSet _memoryDataSet; private readonly IFeatureSerializer _featureSerializer; private readonly HubConnection _hubConnection; private readonly ConcurrentDictionary<Guid, Id> _idMap = new(); private readonly ConcurrentDictionary<Id, Guid> _reverseIdMap = new(); public StreamCommunicator(MemoryDataSet dataSet, IFeatureSerializer featureSerializer) { _featureSerializer = featureSerializer; _memoryDataSet = dataSet; _memoryDataSet.EnableEvents = true; _memoryDataSet.FeatureInserted += FeatureInserted; _memoryDataSet.FeatureChanged += FeatureChanged; _memoryDataSet.FeatureRemoved += FeatureRemoved; _hubConnection = new HubConnectionBuilder() .WithUrl("https://localhost:5001/communicationHub") .WithAutomaticReconnect() .AddMessagePackProtocol() .Build(); _hubConnection.Closed += OnHubConnectionClosed; _hubConnection.Reconnected += OnHubReconnected; } private Task OnHubReconnected(string? arg) { IsConnected = true; return Task.CompletedTask; } private Task OnHubConnectionClosed(Exception? _) { IsConnected = false; return Task.CompletedTask; } public bool IsConnected { get; private set; } public async Task InitializeAsync() { try { await _hubConnection.StartAsync(); var existingFeatures = await _hubConnection.InvokeAsync<List<(Guid, byte[])>>("AllFeatures"); foreach ((Guid id, byte[] buffer) in existingFeatures) HandleStreamAdded(id, buffer); await SetupStreamListeners(); } catch(Exception) { //Log error etc.. } } private Task SetupStreamListeners() { Task addedStreamTask = SetupAddedStream(); Task editStreamTask = SetupStreamEdited(); Task removeStreamTask = SetupRemoveStream(); return Task.WhenAll(addedStreamTask, editStreamTask, removeStreamTask); } private async Task SetupAddedStream() { IAsyncEnumerable<(Guid Id, byte[] Buffer)> addedStream = _hubConnection.StreamAsync<(Guid Id, byte[] Buffer)>("AddedStream"); await foreach ((Guid id, byte[] buffer) in addedStream) HandleStreamAdded(id, buffer); } private void HandleStreamAdded(Guid id, byte[] buffer) { if (_idMap.ContainsKey(id)) return; using var guard = new Guard(_memoryDataSet); Feature feature = _featureSerializer.Deserialize(buffer); // Temporarily turn off events to avoid triggering another // inserted event which could cause recursive behaviour. _memoryDataSet.EnableEvents = false; Id featureId = _memoryDataSet.Insert(feature); _memoryDataSet.EnableEvents = true; _idMap[id] = featureId; _reverseIdMap[featureId] = id; WeakReferenceMessenger.Default.Send(new UpdateViewMessage()); } private async Task SetupStreamEdited() { IAsyncEnumerable<(Guid Id, byte[] Buffer)> editedStream = _hubConnection.StreamAsync<(Guid Id, byte[] Buffer)>("UpdateStream"); await foreach((Guid id, byte[] buffer) in editedStream) HandleStreamEdited(id, buffer); } private void HandleStreamEdited(Guid id, byte[] buffer) { if (!_idMap.ContainsKey(id)) { HandleStreamAdded(id, buffer); return; } Feature feature = _featureSerializer.Deserialize(buffer); Id existingFeatureId = _idMap[id]; // Optimize by replacing with dictionary lookup Feature? existingFeature = _memoryDataSet.GetFeature(existingFeatureId); UpdateFeature(existingFeature, feature.Geometry, feature.Attributes); } private async Task SetupRemoveStream() { IAsyncEnumerable<Guid> removeStream = _hubConnection.StreamAsync<Guid>("RemoveStream"); await foreach(Guid id in removeStream) { if (!_idMap.ContainsKey(id)) continue; _idMap.Remove(id, out Id idToRemove); _reverseIdMap.Remove(idToRemove, out _); using var guard = new Guard(_memoryDataSet); _memoryDataSet.EnableEvents = false; _memoryDataSet.Remove(idToRemove); _memoryDataSet.EnableEvents = true; } } private void UpdateFeature(Feature feature, Geometry geometry, AttributeSet attributes) { switch (geometry) { case PointGeometry point: feature.GetGeometryAsPoint().Point = point.Point; break; case LineGeometry line: { LineGeometry? geom = feature.GetGeometryAsLine(); geom.Points.Clear(); geom.Points.AddRange(line.Points); break; } case PolygonGeometry polygon: { PolygonGeometry? geom = feature.GetGeometryAsPolygon(); geom.Rings.Clear(); geom.Rings.AddRange(polygon.Rings.Clone() as LineGeometryCollection); break; } default: return; } feature.Attributes.Merge(attributes.Clone() as AttributeSet); _memoryDataSet.RefreshFeaturePresentation(feature, false); WeakReferenceMessenger.Default.Send(new UpdateViewMessage()); } private async void FeatureChanged(object sender, MemoryDataSetEventArgs args) { Guid corrId = _reverseIdMap[args.Feature.Id]; byte[] buffer = _featureSerializer.Serialize(args.Feature); await _hubConnection.InvokeAsync("UpdateFeature", corrId, buffer); } private async void FeatureInserted(object sender, MemoryDataSetEventArgs args) { Feature? feature = args.Feature; var corrId = Guid.NewGuid(); _idMap[corrId] = feature.Id; _reverseIdMap[feature.Id] = corrId; byte[] buffer = _featureSerializer.Serialize(feature); await _hubConnection.InvokeAsync("AddFeature", corrId, buffer); WeakReferenceMessenger.Default.Send(new UpdateViewMessage()); } private async void FeatureRemoved(object sender, MemoryDataSetEventArgs args) { Guid corrId = _reverseIdMap[args.Feature.Id]; await _hubConnection.InvokeAsync("RemoveFeature", corrId); WeakReferenceMessenger.Default.Send(new UpdateViewMessage()); } public override async Task StopAsync(CancellationToken cancellationToken) { if (_hubConnection.State == HubConnectionState.Connected) await _hubConnection.StopAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await InitializeAsync(); } }
This is a rather lengthy piece of code since a lot is happening, so we will break down a few things of importance and see what happens in detail.
Let’s start by looking at the constructor; we inject a MemoryDataSet
and an instance of the newly created IFeatureSerializer
. The MemoryDataSet
will hold our features in memory and the StreamCommunicator
will subscribe to the events of the MemoryDataSet
to handle sending updates to the server when features are changed, inserted or removed by handling the events for each action. This is done in the following lines in the constructor using the +=
syntax for adding event handlers in C#.
Next, we use the HubConnectionBuilder
class to set up the HubConnection
which manages the SignalR connection to the server. Note that the URL might be different in your environment. We also subscribe to the Closed
and Reconnected
events on the HubConnection
so that we can notify our application if the connection changes.
The next important method to look into is InitializeAsync
which is called from the base class ExecuteAsync
method. ExecuteAsync
will be called by the framework when the application starts once we’ve added the background service to the list of dependencies, and thus, InitializeAsync
will be called as well. If we look into InitializeAsync
, we see that the HubConnection
is started and we retrieve the entire set of available features on the server by invoking the AllFeatures
method on the CommunicationHub
and consequently adding each feature in the result set by calling the HandleStreamAdded
method. This is the step where we synchronize the state with all other clients before we start subscribing to events and send updates ourselves.
The HandleStreamAdded
method is used in several places to add new features coming from the server when the unique id is known. If we look into the code snippet for HandleStreamAdded
below, we see that the received feature is deserialized, then we turn off the events temporarily on the MemoryDataSet
to avoid recursive behaviors when adding features internally. Next, we add a mapping between the internal feature id created by the MemoryDataSet
and the external id, so that we can correlate between the two. This enables different client applications to apply their own internal schemes for feature identifiers. Lastly, we send a UpdateViewMessage
to signal that the view should be updated to reflect the newly added feature.
private void HandleStreamAdded(Guid id, byte[] buffer) { if (_idMap.ContainsKey(id)) return; using var guard = new Guard(_memoryDataSet); Feature feature = _featureSerializer.Deserialize(buffer); // Temporarily turn off events to avoid triggering another // inserted event which could cause recursive behaviour. _memoryDataSet.EnableEvents = false; Id featureId = _memoryDataSet.Insert(feature); _memoryDataSet.EnableEvents = true; _idMap[id] = featureId; _reverseIdMap[featureId] = id; WeakReferenceMessenger.Default.Send(new UpdateViewMessage()); }
Once we’ve added the features from the server we will start subscribing to the server events to receive updates on features added, removed och updated. This is done by calling the respective setup method for the events, such as SetupAddedStream
and awaiting them. Digging into the SetupAddedStream
method a little bit we see that it’s quite simple; we call the HubConnection.StreamAsync
instance method with the argument "AddedStream"
to subscribe to that stream on the CommunicationHub
. This method will return an IAsyncEnumerable
which we can asynchronously await to receive incoming events. Once we receive a newly added id-feature pair we will once again handle it using the HandleStreamEdited
method. The logic is similar for subscribing to the update stream and to remove streams.
There are two types of updates we have to consider – local feature updates that have to be published to the server, and server updates that have to be synchronized locally. Let’s start by looking at the first case.
To handle local feature updates, we subscribe to the MemoryDataSet.FeatureChanged
event and handle updates in the FeatureChanged
event handler.
private async void FeatureChanged(object sender, MemoryDataSetEventArgs args) { Guid corrId = _reverseIdMap[args.Feature.Id]; byte[] buffer = _featureSerializer.Serialize(args.Feature); await _hubConnection.InvokeAsync("UpdateFeature", corrId, buffer); }
Looking at the FeatureChanged
method we see it’s rather simple – we start by looking up the globally unique id in the _reverseIdMap
field, we then serialize the feature and send it to the server by invoking the UpdateFeature
method on the CommunicationHub
.
SetupStreamEdited
handles subscribing to the UpdateStream
on the communication hub by asynchronously awaiting updates and handling them in the HandleStreamEdited
method. Looking at HandleStreamEdited
we see that we’re first doing a check to see whether we already have a mapping with the incoming feature. If for whatever reason we don’t have that, we first have to add it. We do this by once again invoking the HandleStreamAdded
method. If, however, we do have a mapping, we simply do a lookup and handle the actual updating in the UpdateFeature
method:
private void UpdateFeature(Feature feature, Geometry geometry, AttributeSet attributes) { switch (geometry) { case PointGeometry point: feature.GetGeometryAsPoint().Point = point.Point; break; case LineGeometry line: { LineGeometry? geom = feature.GetGeometryAsLine(); geom.Points.Clear(); geom.Points.AddRange(line.Points); break; } case PolygonGeometry polygon: { PolygonGeometry? geom = feature.GetGeometryAsPolygon(); geom.Rings.Clear(); geom.Rings.AddRange(polygon.Rings.Clone() as LineGeometryCollection); break; } default: return; } feature.Attributes.Merge(attributes.Clone() as AttributeSet); _memoryDataSet.RefreshFeaturePresentation(feature, false); WeakReferenceMessenger.Default.Send(new UpdateViewMessage()); }
The method uses a switch case to handle different geometries and update the local geometry accordingly. It also merges the existing set of attributes with the incoming AttributeSet
.
For brevity, we will not cover the process for removing or handling removed features since it’s fairly similar to the previously explained cases. Please refer to the source code for details.
Finally, we want to add the StreamCommunicator
to the list of dependencies. We will do this by using the AddHostedService
extension method for service registration.
Add the following lines to App.ConfigureServices
:
private static ServiceProvider ConfigureServices(IServiceCollection services) { ... services.AddHostedService<StreamCommunicator>(x => { if (!x.GetService<MapProvider>().TryGetMap("View0", out MapModel mapModel)) throw new Exception("Could not load map configuration."); MemoryDataSet memoryDataSet = mapModel.GetMemoryDataSet("TacticalSymbolsDataSet") ?? throw new Exception("Could not find TacticalSymbolsDataSet."); IFeatureSerializer serializer = x.GetService<IFeatureSerializer>() ?? throw new Exception("Could not get feature serializer implementation."); return new StreamCommunicator(memoryDataSet, serializer); }); ... }
We use the method overload with the factory method argument. With that, we can do a service lookup inside of the registration method to fetch the MemoryDataSet
we want to inject into the StreamCommunicator
. The same procedure goes for the IFeatureSerializer
.
For this to work, we also need to add a way to get a MemoryDataSet
from a MapModel
:
public partial class MapModel { ... public MemoryDataSet? GetMemoryDataSet(string name) => View.GetTypedObject<MemoryDataSet>(name); }
With everything in place, set the start application to the StartService
project and then build and run the application. Then, locate the build directory for the client application and start two instances. Start adding tactical features in either instance of the client application and you should see how the two clients stay synchronized:
What’s more, is that if you close the client applications and start a new instance they will resume their previous state as well!
Tip: The solution to this article contains a launch profile definition which launches the streaming service first and then the client when starting a default debug session in Visual Studio. If using this, all you need to do after that is start a second instance of the client.
This concludes the WPF plus MVVM article series, where we’ve built a small .NET 8 application with progressively more complex functionality using an established architectural pattern and modern code constructs.
In this final part of the series, we’ve shown how to serialize Feature
s to a binary format suitable for streaming. We’ve connected to a SignalR Hub and hooked up handlers for push events from the server.
We’ve also seen how to handle feature events from the local MemoryDataSet
as well as disable events from the MemoryDataSet
when interacting with its Feature
s in the event handlers, to avoid infinite recursion.
Finally, we’ve had a look at a bit more involved dependency management using a long-lived HostedService
.
Learn how the VerticalProfileOperator in Carmenta Engine can be applied to provide new perspectives to strengthen situational awareness.
View Post