From 20e0de5fc19c847620a5b05aa9a9567bfdd68dba Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Tue, 28 Jul 2020 00:44:27 +0800 Subject: [PATCH] Use Queue to save datas --- .gitignore | 1 + IoTSharp.Test/IoTSharp.Test.csproj | 2 +- IoTSharp/Extensions/LiteQueue.cs | 291 ++++++++++++++++++++++++ IoTSharp/FlowRuleEngine/IRuleNode.cs | 15 ++ IoTSharp/FlowRuleEngine/ISwitcher.cs | 14 ++ IoTSharp/FlowRuleEngine/IoTSharpRoot.cs | 18 ++ IoTSharp/Handlers/CoAPResource.cs | 13 +- IoTSharp/Handlers/MQTTServerHandler.cs | 30 +-- IoTSharp/IoTSharp.csproj | 32 +-- IoTSharp/IoTSharp.xml | 108 +++++++++ IoTSharp/Jobs/PushData.cs | 76 +++++++ IoTSharp/Queue/DiskQueue.cs | 38 ++++ IoTSharp/Queue/Enums.cs | 14 ++ IoTSharp/Queue/IMsgQueue.cs | 14 ++ IoTSharp/Queue/MemoryQueue.cs | 27 +++ IoTSharp/Queue/RawMsg.cs | 20 ++ IoTSharp/Services/CoAPService.cs | 5 +- IoTSharp/Startup.cs | 6 + appsettings.Development.json | 16 ++ docker-compose.dcproj | 5 + docker-compose.yml | 3 - 21 files changed, 698 insertions(+), 50 deletions(-) create mode 100644 IoTSharp/Extensions/LiteQueue.cs create mode 100644 IoTSharp/FlowRuleEngine/IRuleNode.cs create mode 100644 IoTSharp/FlowRuleEngine/ISwitcher.cs create mode 100644 IoTSharp/FlowRuleEngine/IoTSharpRoot.cs create mode 100644 IoTSharp/Jobs/PushData.cs create mode 100644 IoTSharp/Queue/DiskQueue.cs create mode 100644 IoTSharp/Queue/Enums.cs create mode 100644 IoTSharp/Queue/IMsgQueue.cs create mode 100644 IoTSharp/Queue/MemoryQueue.cs create mode 100644 IoTSharp/Queue/RawMsg.cs create mode 100644 appsettings.Development.json diff --git a/.gitignore b/.gitignore index 004559f1..c6a3b1f3 100644 --- a/.gitignore +++ b/.gitignore @@ -335,3 +335,4 @@ ASALocalRun/ /IoTSharp/ClientApp/package-lock.json /IoTSharp/healthchecksdb /IoTSharp/healthchecksdb-shm +/IoTSharp/DiskQuue.iotsharp diff --git a/IoTSharp.Test/IoTSharp.Test.csproj b/IoTSharp.Test/IoTSharp.Test.csproj index 452ee7b0..9fa5ada8 100644 --- a/IoTSharp.Test/IoTSharp.Test.csproj +++ b/IoTSharp.Test/IoTSharp.Test.csproj @@ -7,7 +7,7 @@ - + diff --git a/IoTSharp/Extensions/LiteQueue.cs b/IoTSharp/Extensions/LiteQueue.cs new file mode 100644 index 00000000..9243a969 --- /dev/null +++ b/IoTSharp/Extensions/LiteQueue.cs @@ -0,0 +1,291 @@ +/* Copyright 2018 by Nomadeon Software LLC. Licensed uinder MIT: https://opensource.org/licenses/MIT */ +using System; +using System.Collections.Generic; +using System.Text; +using LiteDB; +using System.Linq; +using System.Threading.Tasks; +#pragma warning disable CS1584,CS0419,CS1570 +namespace LiteQueue +{ + public class QueueEntry + { + public long Id { get; set; } + public T Payload { get; set; } + public bool IsCheckedOut { get; set; } + + public QueueEntry() + { + + } + + public QueueEntry(T payload) + { + Payload = payload; + } + } + /// + /// Uses LiteDB to provide a persisted, thread safe, (optionally) transactional, FIFO queue. + /// + /// Suitable for use on clients as a lightweight, portable alternative to MSMQ. Not recommended for use + /// on large server side applications due to performance limitations of LiteDB. + /// + public class LiteQueue + { + ILiteCollection> _collection; + bool _transactional = true; + readonly object _dequeueLock = new object(); + + /// + /// Impacts operation of method. Can only be set once in constructor. + /// + public bool IsTransactional + { + get + { + return _transactional; + } + } + + /// + /// Creates a collection for you in the database + /// + /// The LiteDB database. You are responsible for its lifecycle (using/dispose) + /// Name of the collection to create + /// Whether the queue should use transaction logic, default true + public LiteQueue(LiteDatabase db, string collectionName, bool transactional = true) + { + _collection = db.GetCollection>(collectionName); + _transactional = transactional; + } + + /// + /// Uses the provided database collection + /// + /// A LiteDB collection. + /// Whether the queue should use transaction logic, default true + public LiteQueue(ILiteCollection> collection, bool transactional = true) + { + _collection = collection; + _transactional = transactional; + _collection.EnsureIndex(x => x.Id); + _collection.EnsureIndex(x => x.IsCheckedOut); + } + /// + /// Creates a collection for you in the database, collection's name is + /// + /// The LiteDB database. You are responsible for its lifecycle (using/dispose) + /// Whether the queue should use transaction logic, default true + public LiteQueue(LiteDatabase db, bool transactional = true) + { + _collection = db.GetCollection>(typeof(T).Name); + _transactional = transactional; + } + + /// + /// Adds a single item to queue. See for adding a batch. + /// + /// + public void Enqueue(T item) + { + if (item == null) + { + throw new ArgumentNullException(nameof(item)); + } + + QueueEntry insert = new QueueEntry(item); + + _collection.Insert(insert); + } + + /// + /// Adds a batch of items to the queue. See for adding a single item. + /// + /// + public void Enqueue(IEnumerable items) + { + List> inserts = new List>(); + foreach (var item in items) + { + inserts.Add(new QueueEntry(item)); + } + + _collection.InsertBulk(inserts); + } + + /// + /// Transactional queues: + /// Marks item as checked out but does not remove from queue. You are expected to later call or + /// Non-transactional queues: + /// Removes item from queue with no need to call or + /// + /// An item if found or null + public QueueEntry Dequeue() + { + var result = Dequeue(1); + if (result.Count == 0) + { + return null; + } + else + { + return result[0]; + } + } + + /// + /// Batch equivalent of + /// + /// The maximum number of items to dequeue + /// The items found or an empty collection (never null) + public List> Dequeue(int batchSize) + { + if (_transactional) + { + lock (_dequeueLock) + { + var items = _collection.Find(x => !x.IsCheckedOut, 0, batchSize); + + // Capture the result before changing IsCheckedOut, otherwise items is being changed + var result = new List>(items); + + foreach (var item in result) + { + item.IsCheckedOut = true; + _collection.Update(item); + } + + return result; + } + } + else + { + var items = _collection.Find(x => true, 0, batchSize); + var result = new List>(items); + + foreach (var item in items) + { + _collection.Delete(new BsonValue(item.Id)); + } + + return result; + } + } + + /// + /// Obtains list of items currently checked out (but not yet commited or aborted) as a result of Dequeue calls on a transactional queue + /// + /// Thrown when queue is not transactional + /// Items found or empty collection (never null) + public List> CurrentCheckouts() + { + if (!_transactional) + { + throw new InvalidOperationException("Cannot call " + nameof(CurrentCheckouts) + " unless queue is transactional"); + } + + var records = _collection.Find(item => item.IsCheckedOut); + return new List>(records); + } + + /// + /// Aborts all currently checked out items. Equivalent of calling followed by + /// + /// Thrown when queue is not transactional + public void ResetOrphans() + { + if (!_transactional) + { + throw new InvalidOperationException("Cannot call " + nameof(ResetOrphans) + " unless queue is transactional"); + } + + var checkouts = CurrentCheckouts(); + Abort(checkouts); + } + + /// + /// Aborts a transaction on a single item. See for batches. + /// + /// An item that was obtained from a call + /// Thrown when queue is not transactional + public void Abort(QueueEntry item) + { + if (!_transactional) + { + throw new InvalidOperationException("Cannot call " + nameof(Abort) + " unless queue is transactional"); + } + else if (item == null) + { + throw new ArgumentNullException(nameof(item)); + } + + item.IsCheckedOut = false; + _collection.Update(item); + } + + /// + /// Aborts a transation on a group of items. See for a single item. + /// + /// Items that were obtained from a call + /// Thrown when queue is not transactional + public void Abort(IEnumerable> items) + { + foreach (var item in items) + { + Abort(item); + } + } + + /// + /// Commits a transaction on a single item. See for batches. + /// + /// An item that was obtained from a call + /// Thrown when queue is not transactional + public void Commit(QueueEntry item) + { + if (item == null) + { + throw new ArgumentNullException(nameof(item)); + } + + if (!_transactional) + { + throw new InvalidOperationException("Cannot call " + nameof(Commit) + " unless queue is transactional"); + } + + BsonValue id = new BsonValue(item.Id); + _collection.Delete(id); + } + + + /// + /// Commits a transation on a group of items. See Items that were obtained from a call + /// Thrown when queue is not transactional + public void Commit(IEnumerable> items) + { + foreach (var item in items) + { + Commit(item); + } + } + + /// + /// Number of items in queue, including those that have been checked out. + /// + public int Count() + { + return _collection.Count(); + } + + /// + /// Removes all items from queue, including any that have been checked out. + /// + public void Clear() + { + _collection.DeleteAll(); + } + } +} +#pragma warning restore CS1584, CS0419, CS1570 \ No newline at end of file diff --git a/IoTSharp/FlowRuleEngine/IRuleNode.cs b/IoTSharp/FlowRuleEngine/IRuleNode.cs new file mode 100644 index 00000000..66abaae3 --- /dev/null +++ b/IoTSharp/FlowRuleEngine/IRuleNode.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.FlowRuleEngine +{ + public interface IRuleNode + { + string name { get; set; } + string InputData { get; set; } + public void Run(); + string OuputData { get; set; } + } +} diff --git a/IoTSharp/FlowRuleEngine/ISwitcher.cs b/IoTSharp/FlowRuleEngine/ISwitcher.cs new file mode 100644 index 00000000..de638389 --- /dev/null +++ b/IoTSharp/FlowRuleEngine/ISwitcher.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.FlowRuleEngine +{ + public interface ISwitcher + { + string Switcher { get; } + IRuleNode InputNode { get; set; } + List OuputNodes { get; set; } + } +} diff --git a/IoTSharp/FlowRuleEngine/IoTSharpRoot.cs b/IoTSharp/FlowRuleEngine/IoTSharpRoot.cs new file mode 100644 index 00000000..23419435 --- /dev/null +++ b/IoTSharp/FlowRuleEngine/IoTSharpRoot.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.FlowRuleEngine +{ + public class IoTSharpRoot : ISwitcher + { + public IoTSharpRoot(string json) + { + + } + public string Switcher => nameof(IoTSharpRoot); + public IRuleNode InputNode { get; set; } + public List OuputNodes { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + } +} diff --git a/IoTSharp/Handlers/CoAPResource.cs b/IoTSharp/Handlers/CoAPResource.cs index 21f5aac7..307b9db6 100644 --- a/IoTSharp/Handlers/CoAPResource.cs +++ b/IoTSharp/Handlers/CoAPResource.cs @@ -2,6 +2,7 @@ using CoAP.Server.Resources; using IoTSharp.Data; using IoTSharp.Extensions; +using IoTSharp.Queue; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -23,8 +24,9 @@ namespace IoTSharp.Handlers MediaType.ApplicationOctetStream }; private readonly ILogger _logger; + private readonly IMsgQueue _queue; - public CoApResource(string name, ApplicationDbContext dbContext, ILogger logger) + public CoApResource(string name, ApplicationDbContext dbContext, ILogger logger, IMsgQueue queue) : base(name) { Attributes.Title = name; @@ -35,6 +37,7 @@ namespace IoTSharp.Handlers Attributes.AddContentType(item); } _logger = logger; + _queue = queue; _logger.LogInformation($"CoApResource {name} is created."); } protected override void DoPost(CoapExchange exchange) @@ -110,12 +113,12 @@ namespace IoTSharp.Handlers switch (_res) { case CoApRes.Attributes: - var result1 = await _dbContext.SaveAsync(keyValues, dev, DataSide.ClientSide); - exchange.Respond(StatusCode.Changed, $"{result1.ret}"); + _queue.Enqueue(new RawMsg() { MsgType = MsgType.CoAP, MsgBody = keyValues, DataCatalog = DataCatalog.AttributeData, DataSide = DataSide.ClientSide, DeviceId = dev.Id }); + exchange.Respond(StatusCode.Changed, $"OK"); break; case CoApRes.Telemetry: - var result2 = await _dbContext.SaveAsync(keyValues, dev, DataSide.ClientSide); - exchange.Respond(StatusCode.Created, $"{result2.ret}"); + _queue.Enqueue(new RawMsg() { MsgType = MsgType.CoAP, MsgBody = keyValues, DataCatalog = DataCatalog.AttributeData, DataSide = DataSide.ClientSide, DeviceId = dev.Id }); + exchange.Respond(StatusCode.Created, $"OK"); break; default: break; diff --git a/IoTSharp/Handlers/MQTTServerHandler.cs b/IoTSharp/Handlers/MQTTServerHandler.cs index 67ceaf2f..cae537d9 100644 --- a/IoTSharp/Handlers/MQTTServerHandler.cs +++ b/IoTSharp/Handlers/MQTTServerHandler.cs @@ -1,6 +1,7 @@ using IoTSharp.Data; using IoTSharp.Extensions; using IoTSharp.Handlers; +using IoTSharp.Queue; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -26,10 +27,11 @@ namespace IoTSharp.Handlers readonly ILogger _logger; readonly ApplicationDbContext _dbContext; readonly IMqttServerEx _serverEx; + private readonly IMsgQueue _queue; readonly IServiceScope scope; readonly MqttClientSetting _mcsetting; public MQTTServerHandler(ILogger logger, IServiceScopeFactory scopeFactor,IMqttServerEx serverEx - , IOptions options + , IOptions options,IMsgQueue queue ) { _mcsetting = options.Value.MqttClient; @@ -37,6 +39,7 @@ namespace IoTSharp.Handlers scope = scopeFactor.CreateScope(); _dbContext = scope.ServiceProvider.GetRequiredService(); _serverEx = serverEx; + _queue = queue; } static long clients = 0; @@ -123,17 +126,7 @@ namespace IoTSharp.Handlers } if (tpary[2] == "telemetry") { - Task.Run(async () => - { - try - { - var result = await _dbContext.SaveAsync(keyValues, device, DataSide.ClientSide); - } - catch (Exception ex) - { - _logger.LogError(ex, $"Can't upload telemetry to device {device.Name}({device.Id}).the payload is {e.ApplicationMessage.ConvertPayloadToString()}"); - } - }); + _queue.Enqueue(new RawMsg() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.TelemetryData }); } else if (tpary[2] == "attributes") { @@ -146,18 +139,7 @@ namespace IoTSharp.Handlers } else { - Task.Run(async () => - { - try - { - - var result = await _dbContext.SaveAsync(keyValues, device, DataSide.ClientSide); - } - catch (Exception ex) - { - _logger.LogError(ex, $"Can't upload attributes to device {device.Name}({device.Id}).the payload is \"{e.ApplicationMessage.ConvertPayloadToString()}\""); - } - }); + _queue.Enqueue(new RawMsg() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.AttributeData }); } } diff --git a/IoTSharp/IoTSharp.csproj b/IoTSharp/IoTSharp.csproj index 5edb4223..7ad03c08 100644 --- a/IoTSharp/IoTSharp.csproj +++ b/IoTSharp/IoTSharp.csproj @@ -44,6 +44,8 @@ + + @@ -54,11 +56,12 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive + @@ -75,19 +78,18 @@ - - - - - - - - - - - - - + + + + + + + + + + + + diff --git a/IoTSharp/IoTSharp.xml b/IoTSharp/IoTSharp.xml index 1e191152..4079a971 100644 --- a/IoTSharp/IoTSharp.xml +++ b/IoTSharp/IoTSharp.xml @@ -235,5 +235,113 @@ 查找类似 The certificate is installed 的本地化字符串。 + + + Uses LiteDB to provide a persisted, thread safe, (optionally) transactional, FIFO queue. + + Suitable for use on clients as a lightweight, portable alternative to MSMQ. Not recommended for use + on large server side applications due to performance limitations of LiteDB. + + + + + Impacts operation of method. Can only be set once in constructor. + + + + + Creates a collection for you in the database + + The LiteDB database. You are responsible for its lifecycle (using/dispose) + Name of the collection to create + Whether the queue should use transaction logic, default true + + + + Uses the provided database collection + + A LiteDB collection. + Whether the queue should use transaction logic, default true + + + + Creates a collection for you in the database, collection's name is + + The LiteDB database. You are responsible for its lifecycle (using/dispose) + Whether the queue should use transaction logic, default true + + + + Adds a single item to queue. See for adding a batch. + + + + + + Adds a batch of items to the queue. See for adding a single item. + + + + + + Transactional queues: + Marks item as checked out but does not remove from queue. You are expected to later call or + Non-transactional queues: + Removes item from queue with no need to call or + + An item if found or null + + + + Batch equivalent of + + The maximum number of items to dequeue + The items found or an empty collection (never null) + + + + Obtains list of items currently checked out (but not yet commited or aborted) as a result of Dequeue calls on a transactional queue + + Thrown when queue is not transactional + Items found or empty collection (never null) + + + + Aborts all currently checked out items. Equivalent of calling followed by + + Thrown when queue is not transactional + + + + Aborts a transaction on a single item. See for batches. + + An item that was obtained from a call + Thrown when queue is not transactional + + + + Aborts a transation on a group of items. See for a single item. + + Items that were obtained from a call + Thrown when queue is not transactional + + + + Commits a transaction on a single item. See for batches. + + An item that was obtained from a call + Thrown when queue is not transactional + + + + + Number of items in queue, including those that have been checked out. + + + + + Removes all items from queue, including any that have been checked out. + + diff --git a/IoTSharp/Jobs/PushData.cs b/IoTSharp/Jobs/PushData.cs new file mode 100644 index 00000000..1b53cf1c --- /dev/null +++ b/IoTSharp/Jobs/PushData.cs @@ -0,0 +1,76 @@ +using IoTSharp.Data; +using IoTSharp.Extensions; +using IoTSharp.Queue; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MQTTnet; +using MQTTnet.AspNetCoreEx; +using MQTTnet.Server; +using Quartz; +using SilkierQuartz; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace IoTSharp.Jobs +{ + + [SilkierQuartz(0, "PushData", "Push Iot Message Data to DataBase ", TriggerGroup = "Data")] + public class PushData : IJob + { + readonly ILogger _logger; + readonly IServiceScope scope; + readonly MqttClientSetting _mcsetting; + readonly IMsgQueue _queue; + + public PushData(ILogger logger, IServiceScopeFactory scopeFactor + , IOptions options, IMsgQueue queue + ) + { + _mcsetting = options.Value.MqttClient; + _logger = logger; + scope = scopeFactor.CreateScope(); + _queue = queue; + } + public Task Execute(IJobExecutionContext context) + { + return Task.Run(async () => + { + var msg = _queue.Dequeue(); + if (msg != null) + { + using (var _dbContext = scope.ServiceProvider.GetRequiredService()) + { + var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId); + + if (device != null) + { + switch (msg.DataCatalog) + { + + case DataCatalog.AttributeData: + var result2 = await _dbContext.SaveAsync(msg.MsgBody, device, msg.DataSide); + _logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result2)); + break; + + case DataCatalog.TelemetryData: + var result1 = await _dbContext.SaveAsync(msg.MsgBody, device, msg.DataSide); + _logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result1)); + break; + default: + break; + } + } + } + } + }); + } + + } + +} diff --git a/IoTSharp/Queue/DiskQueue.cs b/IoTSharp/Queue/DiskQueue.cs new file mode 100644 index 00000000..84ac426b --- /dev/null +++ b/IoTSharp/Queue/DiskQueue.cs @@ -0,0 +1,38 @@ +using LiteDB; +using LiteQueue; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.Queue +{ + public class DiskQueue : LiteQueue,IMsgQueue + { + public DiskQueue(string ConnectionString):base(new LiteDatabase(ConnectionString)) + { + + } + public new void Enqueue(RawMsg msg) + { + base.Enqueue(msg); + } + public new RawMsg Dequeue() + { + var msg = base.Dequeue(); + var result = msg.Payload; + try + { + base.Commit(msg); + } + catch (Exception) + { + base.Abort(msg); + result = null; + } + return result; + } + + } +} diff --git a/IoTSharp/Queue/Enums.cs b/IoTSharp/Queue/Enums.cs new file mode 100644 index 00000000..fa28adea --- /dev/null +++ b/IoTSharp/Queue/Enums.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.Queue +{ + public enum MsgType + { + MQTT, + CoAP, + HTTP + } +} diff --git a/IoTSharp/Queue/IMsgQueue.cs b/IoTSharp/Queue/IMsgQueue.cs new file mode 100644 index 00000000..49053ee1 --- /dev/null +++ b/IoTSharp/Queue/IMsgQueue.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.Queue +{ + public interface IMsgQueue + { + public void Enqueue(RawMsg msg); + public RawMsg Dequeue(); + } +} diff --git a/IoTSharp/Queue/MemoryQueue.cs b/IoTSharp/Queue/MemoryQueue.cs new file mode 100644 index 00000000..1906059f --- /dev/null +++ b/IoTSharp/Queue/MemoryQueue.cs @@ -0,0 +1,27 @@ +using Org.BouncyCastle.Asn1; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography.X509Certificates; +using System.Threading.Tasks; + +namespace IoTSharp.Queue +{ + public class MemoryQueue : IMsgQueue + { + Queue _raws; + public MemoryQueue() + { + _raws = new Queue(); + } + public void Enqueue(RawMsg msg) + { + _raws.Enqueue(msg); + } + public RawMsg Dequeue() + { + return _raws.TryDequeue(out RawMsg raw) ? raw : null; + } + + } +} diff --git a/IoTSharp/Queue/RawMsg.cs b/IoTSharp/Queue/RawMsg.cs new file mode 100644 index 00000000..d0bc0f8d --- /dev/null +++ b/IoTSharp/Queue/RawMsg.cs @@ -0,0 +1,20 @@ +using IoTSharp.Data; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.Queue +{ + + + public class RawMsg + { + public MsgType MsgType { get; set; } + + public Guid DeviceId { get; set; } + public Dictionary MsgBody { get; internal set; } + public DataSide DataSide { get; internal set; } + public DataCatalog DataCatalog { get; internal set; } + } +} diff --git a/IoTSharp/Services/CoAPService.cs b/IoTSharp/Services/CoAPService.cs index ed11fb70..c4faf442 100644 --- a/IoTSharp/Services/CoAPService.cs +++ b/IoTSharp/Services/CoAPService.cs @@ -1,6 +1,7 @@ using CoAP.Server; using IoTSharp.Data; using IoTSharp.Handlers; +using IoTSharp.Queue; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -26,14 +27,14 @@ namespace IoTSharp.Services private IServiceScope _serviceScope; private CoapServer server; private readonly AppSettings _settings; - public CoAPService(ILogger logger, IServiceScopeFactory scopeFactor, IOptions options) + public CoAPService(ILogger logger, IServiceScopeFactory scopeFactor, IOptions options,IMsgQueue queue) { _settings = options.Value; server = new CoapServer(_settings.CoapServer); _logger = logger; _serviceScope = scopeFactor.CreateScope(); _dbContext = _serviceScope.ServiceProvider.GetRequiredService(); - Enum.GetNames(typeof(CoApRes)).ToList().ForEach(n => server.Add(new CoApResource(n, _serviceScope.ServiceProvider.GetRequiredService(), _logger))); + Enum.GetNames(typeof(CoApRes)).ToList().ForEach(n => server.Add(new CoApResource(n, _serviceScope.ServiceProvider.GetRequiredService(), _logger,queue))); } diff --git a/IoTSharp/Startup.cs b/IoTSharp/Startup.cs index af496c24..2df7c222 100644 --- a/IoTSharp/Startup.cs +++ b/IoTSharp/Startup.cs @@ -38,6 +38,7 @@ using SilkierQuartz; using HealthChecks.UI.Client; using NSwag; using NSwag.Generation.Processors.Security; +using IoTSharp.Queue; namespace IoTSharp { @@ -142,6 +143,11 @@ namespace IoTSharp }, name: "Disk Storage"); services.AddHealthChecksUI().AddPostgreSqlStorage(Configuration.GetConnectionString("IoTSharp")); services.AddSilkierQuartz(); + services.AddSingleton(o => + { + return new DiskQueue(System.IO.Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "DiskQuue.iotsharp")); + }); + services.AddMemoryCache(); } diff --git a/appsettings.Development.json b/appsettings.Development.json new file mode 100644 index 00000000..148162b6 --- /dev/null +++ b/appsettings.Development.json @@ -0,0 +1,16 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "System": "Information", + "Microsoft": "Information" + } + }, + "ConnectionStrings": { + "IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;" + }, + "JwtKey": "kissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissme", + "JwtExpireHours": 24, + "JwtIssuer": "IoTSharp.Net", + "JwtAudience": "IoTSharp.Net" +} \ No newline at end of file diff --git a/docker-compose.dcproj b/docker-compose.dcproj index 11c368cc..3f325740 100644 --- a/docker-compose.dcproj +++ b/docker-compose.dcproj @@ -9,6 +9,11 @@ iotsharp + + PreserveNewest + true + PreserveNewest + docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml index 74c70720..1db5fc78 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,9 +54,6 @@ services: - 5683:5683 - 5684:5684 - 502:502 - volumes: - - "/var/iotsharp/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro" - - "/etc/iotsharp/appsettings.Production.json:/app/appsettings.Production.json" networks: - iotsharp-network -- GitLab