提交 86201049 编写于 作者: 麦壳饼's avatar 麦壳饼

调整类型转换, 将处理后的数据交给规则链。 这样保持跟入库的数据一致 。

上级 c83eab03
......@@ -81,6 +81,7 @@ namespace IoTSharp
public ZMQOption ZMQOption { get; set; } = null;
public int SucceedMessageExpiredAfter { get; set; } = 3600 * 6;
public DataBaseType DataBase { get; set; } = DataBaseType.PostgreSql;
public int RuleCachingExpiration { get; set; } = 60;
}
public class ShardingSetting
......
......@@ -71,7 +71,7 @@ namespace IoTSharp.FlowRuleEngine
}
}
return (rule, _allFlows: allFlows);
}, TimeSpan.FromMinutes(5));
}, TimeSpan.FromSeconds(_setting.RuleCachingExpiration));
if (cacheRule.HasValue)
{
......@@ -510,7 +510,7 @@ namespace IoTSharp.FlowRuleEngine
}
}
return (rule, _allFlows);
}, TimeSpan.FromMinutes(5));
}, TimeSpan.FromSeconds(_setting.RuleCachingExpiration));
if (cacheRule.HasValue)
{
var flow = _allFlows.FirstOrDefault(c => c.FlowId == flowId);
......@@ -635,7 +635,7 @@ namespace IoTSharp.FlowRuleEngine
}
}
return (rule, _allFlows);
}, TimeSpan.FromMinutes(5));
}, TimeSpan.FromSeconds(_setting.RuleCachingExpiration));
if (cacheRule.HasValue)
{
_allFlows = cacheRule.Value._allFlows;
......
......@@ -42,7 +42,7 @@ namespace IoTSharp.Handlers
_flowRuleProcessor = flowRuleProcessor;
_caching = factory.GetCachingProvider("iotsharp");
}
Dictionary<Guid, DateTime> _check_device_status = new Dictionary<Guid, DateTime>();
Dictionary<Guid, DateTime> _check_device_status = new();
[CapSubscribe("iotsharp.services.datastream.attributedata")]
public async void StoreAttributeData(RawMsg msg)
{
......@@ -94,6 +94,7 @@ namespace IoTSharp.Handlers
{
dc.Add(kp.Key, kp.Value);
}
});
var result2 = await _dbContext.SaveAsync<AttributeLatest>(dc, device.Id, msg.DataSide);
result2.exceptions?.ToList().ForEach(ex =>
......@@ -101,15 +102,21 @@ namespace IoTSharp.Handlers
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"更新{device.Name}({device.Id})属性数据结果{result2.ret}");
ExpandoObject obj = new ExpandoObject();
dc.ToList().ForEach(kv =>
{
obj.TryAdd(kv.Key, kv.Value);
});
await RunRules(msg.DeviceId, obj, MountType.Telemetry);
}
}
}
await RunRules(msg, MountType.Telemetry);
}
[CapSubscribe("iotsharp.services.platform.addnewdevice")]
public async void AddedNewDevice(Device msg)
public void AddedNewDevice(Device msg)
{
using (var _scope = _scopeFactor.CreateScope())
......@@ -126,7 +133,7 @@ namespace IoTSharp.Handlers
public async void StoreTelemetryData(RawMsg msg)
{
await _storage.StoreTelemetryAsync(msg);
var result= await _storage.StoreTelemetryAsync(msg);
if (!_check_device_status.ContainsKey(msg.DeviceId))
{
_check_device_status.Add(msg.DeviceId, DateTime.Now);
......@@ -145,13 +152,17 @@ namespace IoTSharp.Handlers
}
}
}
await RunRules(msg, MountType.Telemetry);
ExpandoObject exps = new ExpandoObject();
result.telemetries.ForEach(td =>
{
exps.TryAdd(td.KeyName, td.ToObject());
});
await RunRules(msg.DeviceId,(dynamic)exps, MountType.Telemetry);
}
private async Task RunRules(RawMsg msg, MountType mountType)
private async Task RunRules(Guid devid, object obj, MountType mountType)
{
var devid = msg.DeviceId;
var rules = await _caching.GetAsync($"ruleid_{devid}_{Enum.GetName(mountType)}", async () =>
{
using (var scope = _scopeFactor.CreateScope())
......@@ -160,10 +171,9 @@ namespace IoTSharp.Handlers
var guids = await _dbContext.GerDeviceRulesIdList(devid, mountType);
return guids;
}
}, TimeSpan.FromMinutes(5));
}, TimeSpan.FromSeconds(_appSettings.RuleCachingExpiration));
if (rules.HasValue)
{
var obj = msg.MsgBody;
rules.Value.ToList().ForEach(async g =>
{
await _flowRuleProcessor.RunFlowRules(g, obj, devid, EventType.Normal, null);
......
......@@ -34,11 +34,14 @@ namespace IoTSharp.Handlers
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly IEasyCachingProvider _caching;
readonly MqttClientSetting _mcsetting;
private readonly AppSettings _settings;
public MQTTServerHandler(ILogger<MQTTServerHandler> logger, IServiceScopeFactory scopeFactor, IMqttServerEx serverEx
, IOptions<AppSettings> options, ICapPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
)
{
_mcsetting = options.Value.MqttClient;
_settings = options.Value;
_logger = logger;
_scopeFactor = scopeFactor;
_factory = factory;
......@@ -72,7 +75,7 @@ namespace IoTSharp.Handlers
{
if (string.IsNullOrEmpty(e.ClientId))
{
_logger.LogInformation($"Message: Topic=[{e.ApplicationMessage.Topic }]");
_logger.LogInformation($"ClientId为空,无法进一步获取设备信息 Topic=[{e.ApplicationMessage.Topic }]");
}
else
{
......@@ -164,7 +167,7 @@ namespace IoTSharp.Handlers
return guids;
}
}
, TimeSpan.FromMinutes(5));
, TimeSpan.FromSeconds(_settings.RuleCachingExpiration));
if (rules.HasValue)
{
var obj = new { e.ApplicationMessage.Topic, Payload = Convert.ToBase64String(e.ApplicationMessage.Payload), e.ClientId };
......@@ -189,7 +192,38 @@ namespace IoTSharp.Handlers
}
else
{
_logger.LogInformation($"{e.ClientId}的数据{e.ApplicationMessage.Topic}未能识别");
_logger.LogInformation($"{e.ClientId}的数据{e.ApplicationMessage.Topic}未能识别,分段:{tpary.Length} 前缀?{tpary[0]} 设备:{_dev?.Id} ,终端状态未找到。");
var ss = await _serverEx.GetClientStatusAsync();
var status= ss.FirstOrDefault(s => s.ClientId == e.ClientId);
if (status != null)
{
_logger.LogInformation($"{e.ClientId}的数据{e.ApplicationMessage.Topic}未能识别,分段:{tpary.Length} 前缀?{tpary[0]} 设备:{_dev?.Id} {status.ConnectedTimestamp} {status.Endpoint} ");
if (!status.Session.Items.ContainsKey("iotsharp_count"))
{
status.Session.Items.Add("iotsharp_count", 1);
}
else
{
status.Session.Items["iotsharp_count"] = 1 + (int)status.Session.Items["iotsharp_count"];
}
if (status.Session.Items.TryGetValue("iotsharp_count", out object count))
{
int _count = (int)count;
if (_count > 10)
{
await status.DisconnectAsync();
_logger.LogInformation($"未识别次数太多{_count}");
}
}
else
{
_logger.LogInformation("识别次数获取错误");
}
}
else
{
_logger.LogInformation("设备状态未能获取");
}
}
}
......
......@@ -81,9 +81,10 @@ namespace IoTSharp.Storage
return kv.AsNoTracking().ToListAsync();
}
public virtual async Task<bool> StoreTelemetryAsync(RawMsg msg)
public virtual async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
List<TelemetryData> telemetries = new List<TelemetryData>();
try
{
using (var _scope = _scopeFactor.CreateScope())
......@@ -97,6 +98,7 @@ namespace IoTSharp.Storage
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key };
tdata.FillKVToMe(kp);
_dbContext.Set<TelemetryData>().Add(tdata);
telemetries.Add(tdata);
}
});
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
......@@ -112,7 +114,9 @@ namespace IoTSharp.Storage
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
return (result, telemetries);
}
}
}
\ No newline at end of file
......@@ -10,7 +10,7 @@ namespace IoTSharp.Storage
{
public interface IStorage
{
Task<bool> StoreTelemetryAsync(RawMsg msg);
Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg);
Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId);
Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin);
......
......@@ -177,9 +177,10 @@ from(bucket: ""{_bucket}"")
return FluxToDtoAsync(v);
}
public async Task<bool> StoreTelemetryAsync(RawMsg msg)
public async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg )
{
bool result = false;
List<TelemetryData> telemetries = new List<TelemetryData>(); ;
try
{
......@@ -227,6 +228,7 @@ from(bucket: ""{_bucket}"")
{
point = point.Timestamp(DateTime.UtcNow, WritePrecision.Ns);
lst.Add(point);
telemetries.Add(tdata);
}
}
});
......@@ -242,7 +244,7 @@ from(bucket: ""{_bucket}"")
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
return (result, telemetries);
}
}
}
\ No newline at end of file
......@@ -175,9 +175,10 @@ namespace IoTSharp.Storage
});
return dt;
}
public Task<bool> StoreTelemetryAsync(RawMsg msg)
public Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
List<TelemetryData> telemetries = new List<TelemetryData>();
PinusConnection _pinus = _pinuspool.Get();
try
{
......@@ -253,7 +254,9 @@ namespace IoTSharp.Storage
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message}");
}
telemetries.Add(tdata);
}
});
}
......@@ -265,7 +268,7 @@ namespace IoTSharp.Storage
{
_pinuspool.Return(_pinus);
}
return Task.FromResult(result);
return Task.FromResult((result, telemetries));
}
private static string GetDevTableName(RawMsg msg) => $"telemetrydata_{msg.DeviceId:N}";
......
......@@ -134,9 +134,10 @@ namespace IoTSharp.Storage
});
}
public async Task<bool> StoreTelemetryAsync(RawMsg msg)
public async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
List<TelemetryData> telemetries = new List<TelemetryData>();
try
{
......@@ -150,6 +151,7 @@ namespace IoTSharp.Storage
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key};
tdata.FillKVToMe(kp);
lst.Add(tdata);
telemetries.Add(tdata);
}
});
int ret = await db.InsertAsync(lst);
......@@ -182,7 +184,7 @@ namespace IoTSharp.Storage
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
return (result,telemetries);
}
}
}
\ No newline at end of file
......@@ -208,9 +208,11 @@ namespace IoTSharp.Storage
return Task.FromResult(dt);
}
public async Task<bool> StoreTelemetryAsync(RawMsg msg)
public async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
List<TelemetryData> telemetries = new List<TelemetryData>();
try
{
CheckDataBase();
......@@ -270,6 +272,7 @@ namespace IoTSharp.Storage
{
string vals = $"device_{tdata.DeviceId:N}_{ Pinyin4Net.GetPinyin(tdata.KeyName, PinyinFormat.WITHOUT_TONE).Replace(" ", string.Empty).Replace("@", string.Empty)} USING telemetrydata TAGS('{tdata.DeviceId:N}','{tdata.KeyName}') (ts,value_type,{_type}) values (now,{(int)tdata.Type},{_value})";
lst.Add(vals);
telemetries.Add(tdata);
}
}
});
......@@ -291,7 +294,7 @@ namespace IoTSharp.Storage
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
return (result,telemetries);
}
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Silkier.EFCore;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace IoTSharp.Storage
......@@ -21,7 +22,7 @@ namespace IoTSharp.Storage
private bool _needcrtate = false;
public override async Task<bool> StoreTelemetryAsync(RawMsg msg)
public override async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
{
if (!_needcrtate)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册