提交 5c75b9b0 编写于 作者: 麦壳饼's avatar 麦壳饼

Subscribe to attribute updates from the server

上级 cdc4b193
......@@ -330,3 +330,4 @@ ASALocalRun/
.mfractor/
/healthchecksdb
/.vscode
/Clients/MQTTClient/.vscode
......@@ -19,6 +19,11 @@ namespace MQTTClient
.WithTcpServer("localhost")
.WithCredentials("3cb97cd31fbc40b08d12ec47a6fad622")//token
.Build();
client.UseApplicationMessageReceivedHandler(ax=>
{
Console.WriteLine($"ClientId{ ax.ClientId},msg={ax.ApplicationMessage.ConvertPayloadToString()}");
});
Task.Run(async () =>
{
await client.ConnectAsync(options);
......@@ -35,6 +40,8 @@ namespace MQTTClient
Console.WriteLine(message.ConvertPayloadToString());
await client.PublishAsync(message);
await Task.Delay(TimeSpan.FromSeconds(10));
await client.SubscribeAsync("/devices/me/attributes/response/+");
await client.PublishAsync("/devices/me/attributes/request/1", "{\"anySide\":\"Doublevalue,longvalue,Doublevalue,longvalue\"}");
} while (Console.ReadKey().Key != ConsoleKey.Escape);
await client.DisconnectAsync();
}).Wait();
......
......@@ -48,96 +48,180 @@ namespace IoTSharp.Handlers
}
Dictionary<string, int> lstTopics = new Dictionary<string, int>();
long received = 0;
internal void Server_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
_logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]");
if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic))
if (string.IsNullOrEmpty(e.ClientId))
{
lstTopics.Add(e.ApplicationMessage.Topic, 1);
Task.Run(() => _serverEx.PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString()));
_logger.LogInformation($"Message: Topic=[{e.ApplicationMessage.Topic }]");
}
else
{
lstTopics[e.ApplicationMessage.Topic]++;
}
if (e.ApplicationMessage.Payload != null)
{
received += e.ApplicationMessage.Payload.Length;
}
string topic = e.ApplicationMessage.Topic;
var tpary = topic.Split('/', StringSplitOptions.RemoveEmptyEntries);
if (tpary.Length >= 3 && tpary[0] == "devices" && Devices.ContainsKey(e.ClientId))
{
Device device = JudgeOrCreateNewDevice(tpary, Devices[e.ClientId]);
if (device != null)
_logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]");
if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic))
{
lstTopics.Add(e.ApplicationMessage.Topic, 1);
Task.Run(() => _serverEx.PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString()));
}
else
{
lstTopics[e.ApplicationMessage.Topic]++;
}
if (e.ApplicationMessage.Payload != null)
{
Dictionary<string, object> keyValues = new Dictionary<string, object>();
if (tpary.Length >= 4)
received += e.ApplicationMessage.Payload.Length;
}
string topic = e.ApplicationMessage.Topic;
var tpary = topic.Split('/', StringSplitOptions.RemoveEmptyEntries);
if (tpary.Length >= 3 && tpary[0] == "devices" && Devices.ContainsKey(e.ClientId))
{
Device device = JudgeOrCreateNewDevice(tpary, Devices[e.ClientId]);
if (device != null)
{
string keyname = tpary.Length >= 5 ? tpary[4] : tpary[3];
if (tpary[3].ToLower() == "xml")
Dictionary<string, object> keyValues = new Dictionary<string, object>();
if (tpary.Length >= 4)
{
try
string keyname = tpary.Length >= 5 ? tpary[4] : tpary[3];
if (tpary[3].ToLower() == "xml")
{
var xml = new System.Xml.XmlDocument();
xml.LoadXml(e.ApplicationMessage.ConvertPayloadToString());
keyValues.Add(keyname, xml);
try
{
var xml = new System.Xml.XmlDocument();
xml.LoadXml(e.ApplicationMessage.ConvertPayloadToString());
keyValues.Add(keyname, xml);
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"xml data error {topic},{ex.Message}");
}
}
catch (Exception ex)
else if (tpary[3].ToLower() == "binary")
{
_logger.LogWarning(ex, $"xml data error {topic},{ex.Message}");
keyValues.Add(keyname, e.ApplicationMessage.Payload);
}
}
else if (tpary[3].ToLower() == "binary")
{
keyValues.Add(keyname, e.ApplicationMessage.Payload);
}
}
else
{
try
{
keyValues = e.ApplicationMessage.ConvertPayloadToDictionary();
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"ConvertPayloadToDictionary Error {topic},{ex.Message}");
}
}
if (tpary[2] == "telemetry")
{
Task.Run(async () =>
else
{
try
{
var result = await _dbContext.SaveAsync<TelemetryLatest, TelemetryData>(keyValues, device, DataSide.ClientSide);
keyValues = e.ApplicationMessage.ConvertPayloadToDictionary();
}
catch (Exception ex)
{
_logger.LogError(ex, $"Can't upload telemetry to device {device.Name}({device.Id}).the payload is {e.ApplicationMessage.ConvertPayloadToString()}");
_logger.LogWarning(ex, $"ConvertPayloadToDictionary Error {topic},{ex.Message}");
}
});
}
else if (tpary[2] == "attributes")
{
Task.Run(async () =>
}
if (tpary[2] == "telemetry")
{
try
Task.Run(async () =>
{
var result = await _dbContext.SaveAsync<AttributeLatest, AttributeData>(keyValues, device, DataSide.ClientSide);
try
{
var result = await _dbContext.SaveAsync<TelemetryLatest, TelemetryData>(keyValues, device, DataSide.ClientSide);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Can't upload telemetry to device {device.Name}({device.Id}).the payload is {e.ApplicationMessage.ConvertPayloadToString()}");
}
});
}
else if (tpary[2] == "attributes")
{
if (tpary.Length > 3 && tpary[3] == "request")
{
Task.Run(async () =>
{
await RequestAttributes(tpary, e.ApplicationMessage.ConvertPayloadToDictionary(), device);
});
}
catch (Exception ex)
else
{
_logger.LogError(ex, $"Can't upload attributes to device {device.Name}({device.Id}).the payload is \"{e.ApplicationMessage.ConvertPayloadToString()}\"");
Task.Run(async () =>
{
try
{
var result = await _dbContext.SaveAsync<AttributeLatest, AttributeData>(keyValues, device, DataSide.ClientSide);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Can't upload attributes to device {device.Name}({device.Id}).the payload is \"{e.ApplicationMessage.ConvertPayloadToString()}\"");
}
});
}
});
}
}
}
}
}
private async Task RequestAttributes(string[] tpary, Dictionary<string, object> keyValues,Device device)
{
if (tpary.Length>5 && tpary[4] == "xml" )
{
var qf = from at in _dbContext.AttributeData where at.Type == DataType.XML && at.KeyName == tpary[5] select at;
await _serverEx.PublishAsync($"/devices/me/attributes/response/{tpary[5]}", qf.FirstOrDefault()?.Value_XML);
}
else if (tpary.Length > 5 && tpary[4] == "binary")
{
var qf = from at in _dbContext.AttributeData where at.Type == DataType.Binary && at.KeyName == tpary[5] select at;
await _serverEx.PublishAsync(new MqttApplicationMessage() { Topic = $"/devices/me/attributes/response/{tpary[5]}", Payload = qf.FirstOrDefault()?.Value_Binary });
}
else
{
Dictionary<string, object> reps = new Dictionary<string, object>();
var reqid = tpary.Length > 4 ? tpary[4] : Guid.NewGuid().ToString();
List<AttributeData> datas = new List<AttributeData>();
foreach (var kx in keyValues)
{
var keys = kx.Value?.ToString().Split(',');
if (keys != null && keys.Length > 0)
{
if (Enum.TryParse(kx.Key, true, out DataSide ds))
{
var qf = from at in _dbContext.AttributeData where at.Device== device && at.DataSide == ds && keys.Contains(at.KeyName) select at;
datas.AddRange(await qf.ToArrayAsync());
}
}
}
foreach (var item in datas)
{
switch (item.Type)
{
case DataType.Boolean:
reps.Add(item.KeyName, item.Value_Boolean);
break;
case DataType.String:
reps.Add(item.KeyName, item.Value_String);
break;
case DataType.Long:
reps.Add(item.KeyName, item.Value_Long);
break;
case DataType.Double:
reps.Add(item.KeyName, item.Value_Double);
break;
case DataType.Json:
reps.Add(item.KeyName, Newtonsoft.Json.Linq.JToken.Parse(item.Value_Json));
break;
case DataType.XML:
reps.Add(item.KeyName, item.Value_XML);
break;
case DataType.Binary:
reps.Add(item.KeyName, item.Value_Binary);
break;
case DataType.DateTime:
reps.Add(item.KeyName, item.Value_DateTime);
break;
default:
reps.Add(item.KeyName, item.Value_Json);
break;
}
}
await _serverEx.PublishAsync($"/devices/me/attributes/response/{reqid}", Newtonsoft.Json.JsonConvert.SerializeObject(reps));
}
}
internal void Server_ClientDisconnected(IMqttServerEx server, MqttServerClientDisconnectedEventArgs args)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册