提交 0398c371 编写于 作者: 麦壳饼's avatar 麦壳饼

重构了遥测数据存储, 为分离存储遥测数据做准备

上级 4d508436
......@@ -21,7 +21,8 @@ namespace IoTSharp.Test
}
[TestMethod]
public void TestSimpleQueue() => TestIMsgQueue<SimpleQueue>();
......@@ -31,8 +32,7 @@ namespace IoTSharp.Test
[TestMethod]
public void TestDiskQueue() => TestIMsgQueue<LiteDBQueue>();
[TestMethod]
public void TestMemoryQueue() => TestIMsgQueue<MemoryQueue>();
......
using CoAP;
using CoAP.Server;
using EFCore.Sharding;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -8,6 +11,12 @@ using System.Threading.Tasks;
namespace IoTSharp
{
[JsonConverter(typeof(StringEnumConverter))]
public enum TelemetryStorage
{
SingleTable,
Sharding
}
public class AppSettings
{
public string JwtKey { get; set; }
......@@ -27,7 +36,14 @@ namespace IoTSharp
public ModBusServerSetting ModBusServer { get; set; } = new ModBusServerSetting();
public TelemetryStorage TelemetryStorage { get; set; } = TelemetryStorage.SingleTable;
public ShardingSetting Sharding { get; set; } = new ShardingSetting();
}
public class ShardingSetting
{
public DatabaseType DatabaseType { get; set; } = DatabaseType.PostgreSql;
public ExpandByDateMode ExpandByDateMode { get; set; } = ExpandByDateMode.PerMonth;
}
public class ModBusServerSetting
{
......@@ -39,10 +55,10 @@ namespace IoTSharp
/// <summary>
/// built-in or IP、HostName
/// </summary>
public string MqttBroker { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }
public string MqttBroker { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }
}
public class MqttBrokerSetting
{
......@@ -51,6 +67,6 @@ namespace IoTSharp
public bool EnableTls { get; set; } = false;
public string Certificate { get; set; }
public SslProtocols SslProtocol { get; set; } = SslProtocols.None;
public bool PersistRetainedMessages { get; set; }
public bool PersistRetainedMessages { get; set; }
}
}
......@@ -18,6 +18,7 @@ using MQTTnet.Exceptions;
using MQTTnet.Client.Options;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.Logging;
using IoTSharp.Storage;
namespace IoTSharp.Controllers
{
......@@ -31,15 +32,17 @@ namespace IoTSharp.Controllers
private readonly UserManager<IdentityUser> _userManager;
private readonly SignInManager<IdentityUser> _signInManager;
private readonly ILogger _logger;
private readonly IStorage _storage;
public DevicesController(UserManager<IdentityUser> userManager,
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, ApplicationDbContext context, IMqttClientOptions mqtt)
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, ApplicationDbContext context, IMqttClientOptions mqtt,IStorage storage)
{
_context = context;
_mqtt = mqtt;
_userManager = userManager;
_signInManager = signInManager;
_logger = logger;
_storage = storage;
}
/// <summary>
......@@ -131,14 +134,14 @@ namespace IoTSharp.Controllers
/// Request telemetry values from the server
/// </summary>
/// <param name="deviceId">Which device do you read?</param>
/// <param name="keyName">Specify key name</param>
/// <param name="keys">Specify key name list , use , or space or ; to split </param>
/// <returns></returns>
[Authorize(Roles = nameof(UserRole.NormalUser))]
[HttpGet("{deviceId}/TelemetryLatest/{keyName}")]
[HttpGet("{deviceId}/TelemetryLatest/{keys}")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)]
[ProducesDefaultResponseType]
public async Task<ActionResult<object>> GetTelemetryLatest(Guid deviceId, string keyName)
public async Task<ActionResult<object>> GetTelemetryLatest(Guid deviceId, string keys)
{
var dev = _context.Device.Find(deviceId);
if (dev == null)
......@@ -147,7 +150,7 @@ namespace IoTSharp.Controllers
}
else
{
var kv = from t in _context.TelemetryLatest where t.DeviceId == dev.Id && t.KeyName == keyName select t;
var kv = from t in _context.TelemetryLatest where t.DeviceId == dev.Id && keys.Split(',',' ',';').Contains(t.KeyName) select t;
return (await kv.FirstOrDefaultAsync())?.ToObject();
}
}
......@@ -155,14 +158,14 @@ namespace IoTSharp.Controllers
/// Request telemetry values from the server
/// </summary>
/// <param name="deviceId">Which device do you read?</param>
/// <param name="keyName">Specify key name</param>
/// <param name="keys">Specify key name list , use , or space or ; to split </param>
/// <returns></returns>
[Authorize(Roles = nameof(UserRole.NormalUser))]
[HttpGet("{deviceId}/AttributeLatest/{keyName}")]
[HttpGet("{deviceId}/AttributeLatest/{keys}")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)]
[ProducesDefaultResponseType]
public async Task<ActionResult<object>> GetAttributeLatest(Guid deviceId,string keyName)
public async Task<ActionResult<object>> GetAttributeLatest(Guid deviceId,string keys)
{
var dev = _context.Device.Find(deviceId);
if (dev == null)
......@@ -171,7 +174,7 @@ namespace IoTSharp.Controllers
}
else
{
var kv = from t in _context.AttributeLatest where t.DeviceId == dev.Id && t.KeyName == keyName select t;
var kv = from t in _context.AttributeLatest where t.DeviceId == dev.Id && keys.Split(',', ' ', ';').Contains(t.KeyName) select t;
return (await kv.FirstOrDefaultAsync())?.ToObject();
}
}
......@@ -180,7 +183,7 @@ namespace IoTSharp.Controllers
/// Request telemetry values from the server
/// </summary>
/// <param name="deviceId">Which device do you read?</param>
/// <param name="keyName">Specify key name</param>
/// <param name="keys">Specify key name list , use , or space or ; to split </param>
/// <param name="begin">For example: 2019-06-06 12:24</param>
/// <returns></returns>
[Authorize(Roles = nameof(UserRole.NormalUser))]
......@@ -188,7 +191,7 @@ namespace IoTSharp.Controllers
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)]
[ProducesDefaultResponseType]
public async Task<ActionResult<object[]>> GetTelemetryLatest(Guid deviceId, string keyName, DateTime begin)
public async Task<ActionResult<List<TelemetryDataDto>>> GetTelemetryData(Guid deviceId, string keys, DateTime begin)
{
var dev = _context.Device.Find(deviceId);
if (dev == null)
......@@ -197,24 +200,24 @@ namespace IoTSharp.Controllers
}
else
{
var kv = from t in _context.TelemetryLatest where t.DeviceId == dev.Id && t.KeyName == keyName && t.DateTime >= begin select t.ToObject();
return await kv.ToArrayAsync();
return keys=="all" ? await _storage.LoadTelemetryAsync(deviceId, begin): await _storage.LoadTelemetryAsync(deviceId,keys,begin);
}
}
/// <summary>
/// Request telemetry values from the server
/// </summary>
/// <param name="deviceId">Which device do you read?</param>
/// <param name="keyName">Specify key name</param>
/// <param name="keys">Specify key name list , use , or space or ; to split </param>
/// <param name="begin">For example: 2019-06-06 12:24</param>
/// <param name="end">For example: 2019-06-06 12:24</param>
/// <returns></returns>
[Authorize(Roles = nameof(UserRole.NormalUser))]
[HttpGet("{deviceId}/TelemetryLatest/{keyName}/{begin}/{end}")]
[HttpGet("{deviceId}/TelemetryData/{keyName}/{begin}/{end}")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)]
[ProducesDefaultResponseType]
public async Task<ActionResult<object>> GetTelemetryLatest(Guid deviceId, string keyName, DateTime begin,DateTime end )
public async Task<ActionResult<List<TelemetryDataDto>>> GetTelemetryData(Guid deviceId, string keys, DateTime begin,DateTime end )
{
var dev = _context.Device.Find(deviceId);
if (dev == null)
......@@ -223,8 +226,8 @@ namespace IoTSharp.Controllers
}
else
{
var kv = from t in _context.TelemetryLatest where t.DeviceId == dev.Id && t.KeyName == keyName && t.DateTime>=begin && t.DateTime <end select t.ToObject() ;
return await kv.ToArrayAsync();
return keys == "all" ? await _storage.LoadTelemetryAsync(deviceId, begin, end): await _storage.LoadTelemetryAsync(deviceId, keys, begin, end);
}
}
......
......@@ -108,4 +108,11 @@ namespace IoTSharp.Data
Attributes,
Telemetry,
}
[JsonConverter(typeof(StringEnumConverter))]
public enum RuleType
{
RuleNode,
RuleSwitcher
}
}
\ No newline at end of file
using CoAP;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Data
{
public class FlowRule
{
public Guid Id { get; set; }
public RuleType RuleType { get; set; }
public string Name { get; set; }
public string Describes { get; set; }
public string Runner { get; set; }
public string ExecutableCode {get;set;}
}
}
using IoTSharp.Data;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Dtos
{
public class TelemetryDataDto
{
public string KeyName { get; set; }
public DateTime DateTime { get; set; }
public object Value{ get; set; }
}
}
using IoTSharp.Data;
using IoTSharp.Queue;
using Microsoft.AspNetCore.Routing.Matching;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using Microsoft.EntityFrameworkCore;
using MQTTnet;
using Newtonsoft.Json.Linq;
using Org.BouncyCastle.Crypto.Modes.Gcm;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -14,7 +18,7 @@ namespace IoTSharp.Extensions
{
internal static (bool ok, Device device) GetDeviceByToken(this ApplicationDbContext _context, string access_token)
{
var deviceIdentity = from id in _context.DeviceIdentities.Include(di=>di.Device) where id.IdentityId == access_token && id.IdentityType == IdentityType.AccessToken select id;
var deviceIdentity = from id in _context.DeviceIdentities.Include(di => di.Device) where id.IdentityId == access_token && id.IdentityType == IdentityType.AccessToken select id;
var devices = from dev in _context.Device where deviceIdentity.Any() && dev.Id == deviceIdentity.FirstOrDefault().Device.Id select dev;
bool ok = deviceIdentity == null || !devices.Any();
return (ok, devices.FirstOrDefault());
......@@ -28,9 +32,9 @@ namespace IoTSharp.Extensions
/// <param name="dataSide"></param>
/// <param name="_context"></param>
/// <returns></returns>
internal static async Task<(int ret, Dic exceptions)> SaveAsync<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Device device, DataSide dataSide) where L : DataStorage, new()
internal static async Task<(int ret, Dic exceptions)> SaveAsync<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Device device, DataSide dataSide) where L : DataStorage, new()
{
Dic exceptions = _context.PreparingData<L>( data, device, dataSide);
Dic exceptions = _context.PreparingData<L>(data, device, dataSide);
int ret = await _context.SaveChangesAsync();
return (ret, exceptions);
}
......@@ -46,34 +50,29 @@ namespace IoTSharp.Extensions
internal static Dic PreparingData<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Device device, DataSide dataSide)
where L : DataStorage, new()
{
Dic exceptions = new Dic();
var devdata = from tx in _context.Set<L>() where tx.DeviceId == device.Id select tx;
data.ToList().ForEach(kp =>
{
try
{
if (kp.Key != null)
{
if (typeof(L) == typeof(TelemetryLatest))
var tl = from tx in devdata where tx.KeyName == kp.Key && tx.DataSide == dataSide select tx;
if (tl.Any())
{
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = device.Id, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
tdata.FillKVToMe(kp);
_context.Set<TelemetryData>().Add(tdata);
tl.First().FillKVToMe(kp);
}
var tl = _context.Set<L>().Where(tx => tx.DeviceId == device.Id && tx.KeyName == kp.Key && tx.DataSide == dataSide);
if (tl != null)
else
{
_context.Set<L>().RemoveRange(tl.ToList());
var t2 = new L() { DateTime = DateTime.Now, DeviceId = device.Id, KeyName = kp.Key, DataSide = dataSide };
t2.Catalog = (typeof(L) == typeof(AttributeLatest) ? DataCatalog.AttributeLatest
: ((typeof(L) == typeof(TelemetryLatest) ? DataCatalog.TelemetryLatest
: 0)));
_context.Set<L>().Add(t2);
t2.FillKVToMe(kp);
}
var t2 = new L() { DateTime = DateTime.Now, DeviceId = device.Id, KeyName = kp.Key, DataSide = dataSide };
t2.Catalog= (typeof(L) == typeof(AttributeLatest) ? DataCatalog.AttributeLatest
: ((typeof(L) == typeof(TelemetryLatest) ? DataCatalog.TelemetryLatest
: 0)));
t2.FillKVToMe(kp);
_context.Set<L>().Add(t2);
}
}
catch (Exception ex)
......@@ -125,7 +124,7 @@ namespace IoTSharp.Extensions
internal static void FillKVToMe<T>(this T tdata, KeyValuePair<string, object> kp) where T : IDataStorage
{
switch (Type.GetTypeCode(kp.Value.GetType()))
{
case TypeCode.Boolean:
......@@ -151,7 +150,7 @@ namespace IoTSharp.Extensions
case TypeCode.Byte:
case TypeCode.SByte:
tdata.Type = DataType.Long;
tdata.Value_Long =(Int64)Convert.ChangeType( kp.Value, TypeCode.Int64);
tdata.Value_Long = (Int64)Convert.ChangeType(kp.Value, TypeCode.Int64);
break;
case TypeCode.String:
case TypeCode.Char:
......@@ -160,7 +159,7 @@ namespace IoTSharp.Extensions
break;
case TypeCode.DateTime:
tdata.Type = DataType.DateTime;
tdata.Value_DateTime =( (DateTime)kp.Value);
tdata.Value_DateTime = ((DateTime)kp.Value);
break;
case TypeCode.DBNull:
case TypeCode.Empty:
......@@ -203,9 +202,9 @@ namespace IoTSharp.Extensions
}
return keyValues;
}
public static object ToObject(this DataStorage kxv)
public static object ToObject(this IDataStorage kxv)
{
object obj = null;
if (kxv != null)
......@@ -240,8 +239,8 @@ namespace IoTSharp.Extensions
break;
}
}
return obj;
}
}
}
......@@ -41,6 +41,8 @@
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="3.1.2" />
<PackageReference Include="AspNetCore.HealthChecks.Network" Version="3.1.1" />
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="3.1.1" />
<PackageReference Include="EFCore.Sharding" Version="3.1.6.8" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="3.1.6.8" />
<PackageReference Include="IoTSharp.CoAP.NET" Version="2.0.8" />
<PackageReference Include="IoTSharp.X509Extensions" Version="1.4.9" />
<PackageReference Include="kimbus" Version="2.0.1" />
......
......@@ -93,7 +93,7 @@
Request telemetry values from the server
</summary>
<param name="deviceId">Which device do you read?</param>
<param name="keyName">Specify key name</param>
<param name="keys">Specify key name list , use , or space or ; to split </param>
<returns></returns>
</member>
<member name="M:IoTSharp.Controllers.DevicesController.GetAttributeLatest(System.Guid,System.String)">
......@@ -101,24 +101,24 @@
Request telemetry values from the server
</summary>
<param name="deviceId">Which device do you read?</param>
<param name="keyName">Specify key name</param>
<param name="keys">Specify key name list , use , or space or ; to split </param>
<returns></returns>
</member>
<member name="M:IoTSharp.Controllers.DevicesController.GetTelemetryLatest(System.Guid,System.String,System.DateTime)">
<member name="M:IoTSharp.Controllers.DevicesController.GetTelemetryData(System.Guid,System.String,System.DateTime)">
<summary>
Request telemetry values from the server
</summary>
<param name="deviceId">Which device do you read?</param>
<param name="keyName">Specify key name</param>
<param name="keys">Specify key name list , use , or space or ; to split </param>
<param name="begin">For example: 2019-06-06 12:24</param>
<returns></returns>
</member>
<member name="M:IoTSharp.Controllers.DevicesController.GetTelemetryLatest(System.Guid,System.String,System.DateTime,System.DateTime)">
<member name="M:IoTSharp.Controllers.DevicesController.GetTelemetryData(System.Guid,System.String,System.DateTime,System.DateTime)">
<summary>
Request telemetry values from the server
</summary>
<param name="deviceId">Which device do you read?</param>
<param name="keyName">Specify key name</param>
<param name="keys">Specify key name list , use , or space or ; to split </param>
<param name="begin">For example: 2019-06-06 12:24</param>
<param name="end">For example: 2019-06-06 12:24</param>
<returns></returns>
......
using IoTSharp.Data;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using IoTSharp.Storage;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......@@ -23,19 +24,21 @@ namespace IoTSharp.Jobs
[SilkierQuartz(0, "PushData", "Push Iot Message Data to DataBase ", TriggerGroup = "Data")]
public class PushData : IJob
{
private readonly AppSettings _appSettings;
readonly ILogger _logger;
readonly IServiceScope scope;
readonly MqttClientSetting _mcsetting;
readonly IMsgQueue _queue;
private readonly IStorage _storage;
public PushData(ILogger<PushData> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options, IMsgQueue queue
, IOptions<AppSettings> options, IMsgQueue queue, IStorage storage
)
{
_mcsetting = options.Value.MqttClient;
_appSettings = options.Value;
_logger = logger;
scope = scopeFactor.CreateScope();
_queue = queue;
_storage = storage;
}
public Task Execute(IJobExecutionContext context)
{
......@@ -55,20 +58,36 @@ namespace IoTSharp.Jobs
case DataCatalog.AttributeData:
var result2 = await _dbContext.SaveAsync<AttributeLatest>(msg.MsgBody, device, msg.DataSide);
if (result2.exceptions?.Count>0)
if (result2.exceptions?.Count > 0)
{
_logger.LogError(Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody));
}
_logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result2));
else
{
_logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result2));
}
break;
case DataCatalog.TelemetryData:
switch (_appSettings.TelemetryStorage)
{
case TelemetryStorage.SingleTable:
await _storage.StoreTelemetryAsync(msg);
break;
case TelemetryStorage.Sharding:
break;
default:
break;
}
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, device, msg.DataSide);
if (result1.exceptions?.Count > 0)
{
_logger.LogError(Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody));
}
_logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result1));
else
{
_logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result1));
}
break;
default:
break;
......@@ -79,6 +98,7 @@ namespace IoTSharp.Jobs
});
}
}
}
......@@ -40,6 +40,8 @@ using NSwag;
using NSwag.Generation.Processors.Security;
using IoTSharp.Queue;
using Npgsql;
using EFCore.Sharding;
using IoTSharp.Storage;
namespace IoTSharp
{
......@@ -151,7 +153,23 @@ namespace IoTSharp
return new LiteDBQueue(System.IO.Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "DiskQuue.iotsharp"));
});
services.AddMemoryCache();
switch (settings.TelemetryStorage)
{
case TelemetryStorage.Sharding:
services.AddEFCoreSharding(config =>
{
config.AddDataSource(Configuration.GetConnectionString("TelemetryStorage"), ReadWriteType.Read | ReadWriteType.Write, settings.Sharding.DatabaseType)
.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime), settings.Sharding.ExpandByDateMode, DateTime.MinValue);
});
break;
case TelemetryStorage.SingleTable:
services.AddSingleton<IStorage, EFStorage>();
break;
default:
break;
}
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Storage
{
public class EFStorage : IStorage
{
private readonly AppSettings _appSettings;
readonly ILogger _logger;
readonly IServiceScope scope;
public EFStorage(ILogger<EFStorage> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options
)
{
_appSettings = options.Value;
_logger = logger;
scope = scopeFactor.CreateScope();
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var kv = from t in _context.TelemetryData
where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin
select new TelemetryDataDto() { DateTime=t.DateTime, KeyName=t.KeyName, Value= t.ToObject() };
return kv.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var kv = from t in _context.TelemetryData
where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var kv = from t in _context.TelemetryData where t.DeviceId == deviceId && t.DateTime >= begin
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var kv = from t in _context.TelemetryData
where t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.ToListAsync();
}
}
public async Task<bool> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
try
{
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
msg.MsgBody.ToList().ForEach(kp =>
{
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
tdata.FillKVToMe(kp);
_dbContext.Set<TelemetryData>().Add(tdata);
});
await _dbContext.SaveChangesAsync();
result = true;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
}
}
}
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Queue;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Storage
{
public interface IStorage
{
Task<bool> StoreTelemetryAsync(RawMsg msg);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin, DateTime end);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end);
}
}
......@@ -5,7 +5,8 @@
}
},
"ConnectionStrings": {
"IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;"
"IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;",
"TelemetryStorage": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;"
},
"JwtKey": "iotsharpiotsharpiotsharpiotsharpiotsharp",
"JwtExpireHours": 24,
......@@ -29,5 +30,10 @@
"EvaluationTimeOnSeconds": 10,
"MinimumSecondsBetweenFailureNotifications": 60
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"TelemetryStorage": "SingleTable",
"Sharding": {
"DatabaseType": "PostgreSql",
"ExpandByDateMode": "PerMonth"
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册