use managedMqtt client to better handle reconnecting #16

pull/34/head
sleevezipper 4 years ago
parent 46b391aed2
commit ea82113451

@ -14,6 +14,7 @@ using MQTTnet.Adapter;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Extensions.ManagedClient;
using Serilog; using Serilog;
namespace hass_workstation_service.Communication namespace hass_workstation_service.Communication
@ -21,11 +22,12 @@ namespace hass_workstation_service.Communication
public class MqttPublisher public class MqttPublisher
{ {
private readonly IMqttClient _mqttClient; private readonly IManagedMqttClient _mqttClient;
private readonly ILogger<MqttPublisher> _logger; private readonly ILogger<MqttPublisher> _logger;
private readonly IConfigurationService _configurationService; private readonly IConfigurationService _configurationService;
private string _mqttClientMessage { get; set; } private string _mqttClientMessage { get; set; }
public DateTime LastConfigAnnounce { get; private set; } public DateTime LastConfigAnnounce { get; private set; }
public DateTime LastAvailabilityAnnounce { get; private set; }
public DeviceConfigModel DeviceConfigModel { get; private set; } public DeviceConfigModel DeviceConfigModel { get; private set; }
public ICollection<AbstractCommand> Subscribers { get; private set; } public ICollection<AbstractCommand> Subscribers { get; private set; }
public bool IsConnected public bool IsConnected
@ -57,12 +59,11 @@ namespace hass_workstation_service.Communication
_configurationService.MqqtConfigChangedHandler = this.ReplaceMqttClient; _configurationService.MqqtConfigChangedHandler = this.ReplaceMqttClient;
var factory = new MqttFactory(); var factory = new MqttFactory();
this._mqttClient = factory.CreateMqttClient(); this._mqttClient = factory.CreateManagedMqttClient();
if (options != null) if (options != null)
{ {
options.WillMessage.Topic = $"homeassistant/sensor/{this.DeviceConfigModel.Name}/availability"; this._mqttClient.StartAsync(options);
this._mqttClient.ConnectAsync(options);
this._mqttClientMessage = "Connecting..."; this._mqttClientMessage = "Connecting...";
} }
else else
@ -76,23 +77,10 @@ namespace hass_workstation_service.Communication
this._mqttClient.UseApplicationMessageReceivedHandler(e => this.HandleMessageReceived(e.ApplicationMessage)); this._mqttClient.UseApplicationMessageReceivedHandler(e => this.HandleMessageReceived(e.ApplicationMessage));
// configure what happens on disconnect // configure what happens on disconnect
this._mqttClient.UseDisconnectedHandler(async e => this._mqttClient.UseDisconnectedHandler(e =>
{ {
this._mqttClientMessage = e.ReasonCode.ToString(); this._mqttClientMessage = e.ReasonCode.ToString();
if (e.ReasonCode != MQTTnet.Client.Disconnecting.MqttClientDisconnectReason.NormalDisconnection)
{
_logger.LogWarning("Disconnected from server");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await this._mqttClient.ConnectAsync(options, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Reconnecting failed");
}
}
}); });
} }
@ -130,13 +118,13 @@ namespace hass_workstation_service.Communication
} }
} }
public async void ReplaceMqttClient(IMqttClientOptions options) public async void ReplaceMqttClient(IManagedMqttClientOptions options)
{ {
this._logger.LogInformation($"Replacing Mqtt client with new config"); this._logger.LogInformation($"Replacing Mqtt client with new config");
await _mqttClient.DisconnectAsync(); await _mqttClient.StopAsync();
try try
{ {
await _mqttClient.ConnectAsync(options); await _mqttClient.StartAsync(options);
} }
catch (MqttConnectingFailedException ex) catch (MqttConnectingFailedException ex)
{ {
@ -165,6 +153,7 @@ namespace hass_workstation_service.Communication
.WithPayload(offline ? "offline" : "online") .WithPayload(offline ? "offline" : "online")
.Build() .Build()
); );
this.LastAvailabilityAnnounce = DateTime.UtcNow;
} }
else else
{ {
@ -176,7 +165,7 @@ namespace hass_workstation_service.Communication
{ {
if (this._mqttClient.IsConnected) if (this._mqttClient.IsConnected)
{ {
await this._mqttClient.DisconnectAsync(); await this._mqttClient.InternalClient.DisconnectAsync();
} }
else else
{ {

@ -17,6 +17,7 @@ using Microsoft.Win32;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using Serilog; using Serilog;
namespace hass_workstation_service.Data namespace hass_workstation_service.Data
@ -25,7 +26,7 @@ namespace hass_workstation_service.Data
{ {
public ICollection<AbstractSensor> ConfiguredSensors { get; private set; } public ICollection<AbstractSensor> ConfiguredSensors { get; private set; }
public ICollection<AbstractCommand> ConfiguredCommands { get; private set; } public ICollection<AbstractCommand> ConfiguredCommands { get; private set; }
public Action<IMqttClientOptions> MqqtConfigChangedHandler { get; set; } public Action<IManagedMqttClientOptions> MqqtConfigChangedHandler { get; set; }
private readonly DeviceConfigModel _deviceConfigModel; private readonly DeviceConfigModel _deviceConfigModel;
private bool BrokerSettingsFileLocked { get; set; } private bool BrokerSettingsFileLocked { get; set; }
@ -181,7 +182,7 @@ namespace hass_workstation_service.Data
} }
} }
public async Task<IMqttClientOptions> GetMqttClientOptionsAsync() public async Task<IManagedMqttClientOptions> GetMqttClientOptionsAsync()
{ {
ConfiguredMqttBroker configuredBroker = await ReadMqttSettingsAsync(); ConfiguredMqttBroker configuredBroker = await ReadMqttSettingsAsync();
if (configuredBroker != null && configuredBroker.Host != null) if (configuredBroker != null && configuredBroker.Host != null)
@ -202,7 +203,7 @@ namespace hass_workstation_service.Data
.WithPayload("offline") .WithPayload("offline")
.Build()) .Build())
.Build(); .Build();
return mqttClientOptions; return new ManagedMqttClientOptionsBuilder().WithClientOptions(mqttClientOptions).Build();
} }
else else
{ {

@ -3,6 +3,7 @@ using hass_workstation_service.Communication.InterProcesCommunication.Models;
using hass_workstation_service.Domain.Commands; using hass_workstation_service.Domain.Commands;
using hass_workstation_service.Domain.Sensors; using hass_workstation_service.Domain.Sensors;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Security; using System.Security;
@ -13,13 +14,13 @@ namespace hass_workstation_service.Data
public interface IConfigurationService public interface IConfigurationService
{ {
ICollection<AbstractSensor> ConfiguredSensors { get; } ICollection<AbstractSensor> ConfiguredSensors { get; }
Action<IMqttClientOptions> MqqtConfigChangedHandler { get; set; } Action<IManagedMqttClientOptions> MqqtConfigChangedHandler { get; set; }
ICollection<AbstractCommand> ConfiguredCommands { get; } ICollection<AbstractCommand> ConfiguredCommands { get; }
void AddConfiguredCommand(AbstractCommand command); void AddConfiguredCommand(AbstractCommand command);
void AddConfiguredSensor(AbstractSensor sensor); void AddConfiguredSensor(AbstractSensor sensor);
void AddConfiguredSensors(List<AbstractSensor> sensors); void AddConfiguredSensors(List<AbstractSensor> sensors);
Task<IMqttClientOptions> GetMqttClientOptionsAsync(); Task<IManagedMqttClientOptions> GetMqttClientOptionsAsync();
void ReadSensorSettings(MqttPublisher publisher); void ReadSensorSettings(MqttPublisher publisher);
void WriteMqttBrokerSettingsAsync(MqttSettings settings); void WriteMqttBrokerSettingsAsync(MqttSettings settings);
void WriteSensorSettingsAsync(); void WriteSensorSettingsAsync();

@ -56,6 +56,12 @@ namespace hass_workstation_service
{ {
_logger.LogDebug("Worker running at: {time}", DateTimeOffset.Now); _logger.LogDebug("Worker running at: {time}", DateTimeOffset.Now);
// announce autodiscovery every 30 seconds
if (_mqttPublisher.LastAvailabilityAnnounce < DateTime.UtcNow.AddSeconds(-10))
{
_mqttPublisher.AnnounceAvailability("sensor");
}
foreach (AbstractSensor sensor in sensors) foreach (AbstractSensor sensor in sensors)
{ {
try try
@ -91,7 +97,6 @@ namespace hass_workstation_service
{ {
command.PublishAutoDiscoveryConfigAsync(); command.PublishAutoDiscoveryConfigAsync();
} }
_mqttPublisher.AnnounceAvailability("sensor");
} }
await Task.Delay(1000, stoppingToken); await Task.Delay(1000, stoppingToken);
} }
@ -100,7 +105,7 @@ namespace hass_workstation_service
public override async Task StopAsync(CancellationToken stoppingToken) public override async Task StopAsync(CancellationToken stoppingToken)
{ {
_mqttPublisher.AnnounceAvailability("sensor", true); _mqttPublisher.AnnounceAvailability("sensor", true);
await _mqttPublisher.DisconnectAsync(); await _mqttPublisher.DisconnectAsync();
} }

@ -49,6 +49,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.Win32.Registry" Version="5.0.0" /> <PackageReference Include="Microsoft.Win32.Registry" Version="5.0.0" />
<PackageReference Include="MQTTnet" Version="3.0.13" /> <PackageReference Include="MQTTnet" Version="3.0.13" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="3.0.13" />
<PackageReference Include="Serilog.Extensions.Logging.File" Version="2.0.0" /> <PackageReference Include="Serilog.Extensions.Logging.File" Version="2.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.File" Version="4.1.0" /> <PackageReference Include="Serilog.Sinks.File" Version="4.1.0" />

Loading…
Cancel
Save