提交 20e0de5f 编写于 作者: 麦壳饼's avatar 麦壳饼

Use Queue to save datas

上级 b0b01765
......@@ -335,3 +335,4 @@ ASALocalRun/
/IoTSharp/ClientApp/package-lock.json
/IoTSharp/healthchecksdb
/IoTSharp/healthchecksdb-shm
/IoTSharp/DiskQuue.iotsharp
......@@ -7,7 +7,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
<PackageReference Include="MSTest.TestAdapter" Version="2.1.2" />
<PackageReference Include="MSTest.TestFramework" Version="2.1.2" />
......
/* Copyright 2018 by Nomadeon Software LLC. Licensed uinder MIT: https://opensource.org/licenses/MIT */
using System;
using System.Collections.Generic;
using System.Text;
using LiteDB;
using System.Linq;
using System.Threading.Tasks;
#pragma warning disable CS1584,CS0419,CS1570
namespace LiteQueue
{
public class QueueEntry<T>
{
public long Id { get; set; }
public T Payload { get; set; }
public bool IsCheckedOut { get; set; }
public QueueEntry()
{
}
public QueueEntry(T payload)
{
Payload = payload;
}
}
/// <summary>
/// Uses LiteDB to provide a persisted, thread safe, (optionally) transactional, FIFO queue.
///
/// Suitable for use on clients as a lightweight, portable alternative to MSMQ. Not recommended for use
/// on large server side applications due to performance limitations of LiteDB.
/// </summary>
public class LiteQueue<T>
{
ILiteCollection<QueueEntry<T>> _collection;
bool _transactional = true;
readonly object _dequeueLock = new object();
/// <summary>
/// Impacts operation of <see cref="Dequeue"/> method. Can only be set once in constructor.
/// </summary>
public bool IsTransactional
{
get
{
return _transactional;
}
}
/// <summary>
/// Creates a collection for you in the database
/// </summary>
/// <param name="db">The LiteDB database. You are responsible for its lifecycle (using/dispose)</param>
/// <param name="collectionName">Name of the collection to create</param>
/// <param name="transactional">Whether the queue should use transaction logic, default true</param>
public LiteQueue(LiteDatabase db, string collectionName, bool transactional = true)
{
_collection = db.GetCollection<QueueEntry<T>>(collectionName);
_transactional = transactional;
}
/// <summary>
/// Uses the provided database collection
/// </summary>
/// <param name="collection">A LiteDB collection.</param>
/// <param name="transactional">Whether the queue should use transaction logic, default true</param>
public LiteQueue(ILiteCollection<QueueEntry<T>> collection, bool transactional = true)
{
_collection = collection;
_transactional = transactional;
_collection.EnsureIndex(x => x.Id);
_collection.EnsureIndex(x => x.IsCheckedOut);
}
/// <summary>
/// Creates a collection for you in the database, collection's name is <typeparamref name="T"/>
/// </summary>
/// <param name="db">The LiteDB database. You are responsible for its lifecycle (using/dispose)</param>
/// <param name="transactional">Whether the queue should use transaction logic, default true</param>
public LiteQueue(LiteDatabase db, bool transactional = true)
{
_collection = db.GetCollection<QueueEntry<T>>(typeof(T).Name);
_transactional = transactional;
}
/// <summary>
/// Adds a single item to queue. See <see cref="Enqueue(IEnumerable{T})"/> for adding a batch.
/// </summary>
/// <param name="item"></param>
public void Enqueue(T item)
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}
QueueEntry<T> insert = new QueueEntry<T>(item);
_collection.Insert(insert);
}
/// <summary>
/// Adds a batch of items to the queue. See <see cref="Enqueue(T)"/> for adding a single item.
/// </summary>
/// <param name="items"></param>
public void Enqueue(IEnumerable<T> items)
{
List<QueueEntry<T>> inserts = new List<QueueEntry<T>>();
foreach (var item in items)
{
inserts.Add(new QueueEntry<T>(item));
}
_collection.InsertBulk(inserts);
}
/// <summary>
/// Transactional queues:
/// Marks item as checked out but does not remove from queue. You are expected to later call <see cref="Commit(QueueEntry{T})"/> or <see cref="Abort(QueueEntry{T})"/>
/// Non-transactional queues:
/// Removes item from queue with no need to call <see cref="Commit(QueueEntry{T})"/> or <see cref="Abort(QueueEntry{T})"/>
/// </summary>
/// <returns>An item if found or null</returns>
public QueueEntry<T> Dequeue()
{
var result = Dequeue(1);
if (result.Count == 0)
{
return null;
}
else
{
return result[0];
}
}
/// <summary>
/// Batch equivalent of <see cref="Dequeue"/>
/// </summary>
/// <param name="batchSize">The maximum number of items to dequeue</param>
/// <returns>The items found or an empty collection (never null)</returns>
public List<QueueEntry<T>> Dequeue(int batchSize)
{
if (_transactional)
{
lock (_dequeueLock)
{
var items = _collection.Find(x => !x.IsCheckedOut, 0, batchSize);
// Capture the result before changing IsCheckedOut, otherwise items is being changed
var result = new List<QueueEntry<T>>(items);
foreach (var item in result)
{
item.IsCheckedOut = true;
_collection.Update(item);
}
return result;
}
}
else
{
var items = _collection.Find(x => true, 0, batchSize);
var result = new List<QueueEntry<T>>(items);
foreach (var item in items)
{
_collection.Delete(new BsonValue(item.Id));
}
return result;
}
}
/// <summary>
/// Obtains list of items currently checked out (but not yet commited or aborted) as a result of Dequeue calls on a transactional queue
/// </summary>
/// <exception cref="InvalidOperationException">Thrown when queue is not transactional</exception>
/// <returns>Items found or empty collection (never null)</returns>
public List<QueueEntry<T>> CurrentCheckouts()
{
if (!_transactional)
{
throw new InvalidOperationException("Cannot call " + nameof(CurrentCheckouts) + " unless queue is transactional");
}
var records = _collection.Find(item => item.IsCheckedOut);
return new List<QueueEntry<T>>(records);
}
/// <summary>
/// Aborts all currently checked out items. Equivalent of calling <see cref="CurrentCheckouts"/> followed by <see cref="Abort(IEnumerable{QueueEntry{T}})"/>
/// </summary>
/// <exception cref="InvalidOperationException">Thrown when queue is not transactional</exception>
public void ResetOrphans()
{
if (!_transactional)
{
throw new InvalidOperationException("Cannot call " + nameof(ResetOrphans) + " unless queue is transactional");
}
var checkouts = CurrentCheckouts();
Abort(checkouts);
}
/// <summary>
/// Aborts a transaction on a single item. See <see cref="Abort(IEnumerable{QueueEntry{T}})"/> for batches.
/// </summary>
/// <param name="item">An item that was obtained from a <see cref="Dequeue"/> call</param>
/// <exception cref="InvalidOperationException">Thrown when queue is not transactional</exception>
public void Abort(QueueEntry<T> item)
{
if (!_transactional)
{
throw new InvalidOperationException("Cannot call " + nameof(Abort) + " unless queue is transactional");
}
else if (item == null)
{
throw new ArgumentNullException(nameof(item));
}
item.IsCheckedOut = false;
_collection.Update(item);
}
/// <summary>
/// Aborts a transation on a group of items. See <see cref="Abort(QueueEntry{T})"/> for a single item.
/// </summary>
/// <param name="items">Items that were obtained from a <see cref="Dequeue"/> call</param>
/// <exception cref="InvalidOperationException">Thrown when queue is not transactional</exception>
public void Abort(IEnumerable<QueueEntry<T>> items)
{
foreach (var item in items)
{
Abort(item);
}
}
/// <summary>
/// Commits a transaction on a single item. See <see cref="Commit(IEnumerable{QueueEntry{T}})"/> for batches.
/// </summary>
/// <param name="item">An item that was obtained from a <see cref="Dequeue"/> call</param>
/// <exception cref="InvalidOperationException">Thrown when queue is not transactional</exception>
public void Commit(QueueEntry<T> item)
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}
if (!_transactional)
{
throw new InvalidOperationException("Cannot call " + nameof(Commit) + " unless queue is transactional");
}
BsonValue id = new BsonValue(item.Id);
_collection.Delete(id);
}
/// <summary>
/// Commits a transation on a group of items. See <see cref="Commit(QueueEntry{T})/> for a single item.
/// </summary>
/// <param name="items">Items that were obtained from a <see cref="Dequeue"/> call</param>
/// <exception cref="InvalidOperationException">Thrown when queue is not transactional</exception>
public void Commit(IEnumerable<QueueEntry<T>> items)
{
foreach (var item in items)
{
Commit(item);
}
}
/// <summary>
/// Number of items in queue, including those that have been checked out.
/// </summary>
public int Count()
{
return _collection.Count();
}
/// <summary>
/// Removes all items from queue, including any that have been checked out.
/// </summary>
public void Clear()
{
_collection.DeleteAll();
}
}
}
#pragma warning restore CS1584, CS0419, CS1570
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.FlowRuleEngine
{
public interface IRuleNode
{
string name { get; set; }
string InputData { get; set; }
public void Run();
string OuputData { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.FlowRuleEngine
{
public interface ISwitcher
{
string Switcher { get; }
IRuleNode InputNode { get; set; }
List<IRuleNode> OuputNodes { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.FlowRuleEngine
{
public class IoTSharpRoot : ISwitcher
{
public IoTSharpRoot(string json)
{
}
public string Switcher => nameof(IoTSharpRoot);
public IRuleNode InputNode { get; set; }
public List<IRuleNode> OuputNodes { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
}
}
......@@ -2,6 +2,7 @@
using CoAP.Server.Resources;
using IoTSharp.Data;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
......@@ -23,8 +24,9 @@ namespace IoTSharp.Handlers
MediaType.ApplicationOctetStream
};
private readonly ILogger _logger;
private readonly IMsgQueue _queue;
public CoApResource(string name, ApplicationDbContext dbContext, ILogger logger)
public CoApResource(string name, ApplicationDbContext dbContext, ILogger logger, IMsgQueue queue)
: base(name)
{
Attributes.Title = name;
......@@ -35,6 +37,7 @@ namespace IoTSharp.Handlers
Attributes.AddContentType(item);
}
_logger = logger;
_queue = queue;
_logger.LogInformation($"CoApResource {name} is created.");
}
protected override void DoPost(CoapExchange exchange)
......@@ -110,12 +113,12 @@ namespace IoTSharp.Handlers
switch (_res)
{
case CoApRes.Attributes:
var result1 = await _dbContext.SaveAsync<AttributeLatest, AttributeData>(keyValues, dev, DataSide.ClientSide);
exchange.Respond(StatusCode.Changed, $"{result1.ret}");
_queue.Enqueue(new RawMsg() { MsgType = MsgType.CoAP, MsgBody = keyValues, DataCatalog = DataCatalog.AttributeData, DataSide = DataSide.ClientSide, DeviceId = dev.Id });
exchange.Respond(StatusCode.Changed, $"OK");
break;
case CoApRes.Telemetry:
var result2 = await _dbContext.SaveAsync<TelemetryLatest, TelemetryData>(keyValues, dev, DataSide.ClientSide);
exchange.Respond(StatusCode.Created, $"{result2.ret}");
_queue.Enqueue(new RawMsg() { MsgType = MsgType.CoAP, MsgBody = keyValues, DataCatalog = DataCatalog.AttributeData, DataSide = DataSide.ClientSide, DeviceId = dev.Id });
exchange.Respond(StatusCode.Created, $"OK");
break;
default:
break;
......
using IoTSharp.Data;
using IoTSharp.Extensions;
using IoTSharp.Handlers;
using IoTSharp.Queue;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
......@@ -26,10 +27,11 @@ namespace IoTSharp.Handlers
readonly ILogger<MQTTServerHandler> _logger;
readonly ApplicationDbContext _dbContext;
readonly IMqttServerEx _serverEx;
private readonly IMsgQueue _queue;
readonly IServiceScope scope;
readonly MqttClientSetting _mcsetting;
public MQTTServerHandler(ILogger<MQTTServerHandler> logger, IServiceScopeFactory scopeFactor,IMqttServerEx serverEx
, IOptions <AppSettings> options
, IOptions <AppSettings> options,IMsgQueue queue
)
{
_mcsetting = options.Value.MqttClient;
......@@ -37,6 +39,7 @@ namespace IoTSharp.Handlers
scope = scopeFactor.CreateScope();
_dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
_serverEx = serverEx;
_queue = queue;
}
static long clients = 0;
......@@ -123,17 +126,7 @@ namespace IoTSharp.Handlers
}
if (tpary[2] == "telemetry")
{
Task.Run(async () =>
{
try
{
var result = await _dbContext.SaveAsync<TelemetryLatest, TelemetryData>(keyValues, device, DataSide.ClientSide);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Can't upload telemetry to device {device.Name}({device.Id}).the payload is {e.ApplicationMessage.ConvertPayloadToString()}");
}
});
_queue.Enqueue(new RawMsg() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.TelemetryData });
}
else if (tpary[2] == "attributes")
{
......@@ -146,18 +139,7 @@ namespace IoTSharp.Handlers
}
else
{
Task.Run(async () =>
{
try
{
var result = await _dbContext.SaveAsync<AttributeLatest, AttributeData>(keyValues, device, DataSide.ClientSide);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Can't upload attributes to device {device.Name}({device.Id}).the payload is \"{e.ApplicationMessage.ConvertPayloadToString()}\"");
}
});
_queue.Enqueue(new RawMsg() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.AttributeData });
}
}
......
......@@ -44,6 +44,8 @@
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="3.1.1" />
<PackageReference Include="IoTSharp.CoAP.NET" Version="2.0.8" />
......@@ -54,11 +56,12 @@
<PackageReference Include="kimbus" Version="2.0.1" />
<PackageReference Include="LiteDB" Version="5.0.8" />
<PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="3.1.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="3.1.5">
<PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="3.1.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="3.1.6">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.6" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.8" />
......@@ -75,19 +78,18 @@
<PackageReference Include="SilkierQuartz" Version="1.0.46" />
<PackageReference Include="System.ServiceModel.Primitives" Version="4.7.0" />
<PackageReference Include="System.Text.Encoding.CodePages" Version="4.7.1" />
<PackageReference Include="CoAP.NET.Core" Version="1.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="3.1.5" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics.EntityFrameworkCore" Version="3.1.5" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.WindowsServices" Version="3.1.5" />
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="3.1.5" />
<PackageReference Include="Microsoft.AspNetCore.SpaServices" Version="3.1.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="3.1.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="3.1.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="3.1.5" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="3.1.6" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics.EntityFrameworkCore" Version="3.1.6" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.WindowsServices" Version="3.1.6" />
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="3.1.6" />
<PackageReference Include="Microsoft.AspNetCore.SpaServices" Version="3.1.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="3.1.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="3.1.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="3.1.6" />
<PackageReference Include="MQTTnet.AspNetCoreEx" Version="3.0.11" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="3.0.12-rc1" />
......
......@@ -235,5 +235,113 @@
查找类似 The certificate is installed 的本地化字符串。
</summary>
</member>
<member name="T:LiteQueue.LiteQueue`1">
<summary>
Uses LiteDB to provide a persisted, thread safe, (optionally) transactional, FIFO queue.
Suitable for use on clients as a lightweight, portable alternative to MSMQ. Not recommended for use
on large server side applications due to performance limitations of LiteDB.
</summary>
</member>
<member name="P:LiteQueue.LiteQueue`1.IsTransactional">
<summary>
Impacts operation of <see cref="M:LiteQueue.LiteQueue`1.Dequeue"/> method. Can only be set once in constructor.
</summary>
</member>
<member name="M:LiteQueue.LiteQueue`1.#ctor(LiteDB.LiteDatabase,System.String,System.Boolean)">
<summary>
Creates a collection for you in the database
</summary>
<param name="db">The LiteDB database. You are responsible for its lifecycle (using/dispose)</param>
<param name="collectionName">Name of the collection to create</param>
<param name="transactional">Whether the queue should use transaction logic, default true</param>
</member>
<member name="M:LiteQueue.LiteQueue`1.#ctor(LiteDB.ILiteCollection{LiteQueue.QueueEntry{`0}},System.Boolean)">
<summary>
Uses the provided database collection
</summary>
<param name="collection">A LiteDB collection.</param>
<param name="transactional">Whether the queue should use transaction logic, default true</param>
</member>
<member name="M:LiteQueue.LiteQueue`1.#ctor(LiteDB.LiteDatabase,System.Boolean)">
<summary>
Creates a collection for you in the database, collection's name is <typeparamref name="T"/>
</summary>
<param name="db">The LiteDB database. You are responsible for its lifecycle (using/dispose)</param>
<param name="transactional">Whether the queue should use transaction logic, default true</param>
</member>
<member name="M:LiteQueue.LiteQueue`1.Enqueue(`0)">
<summary>
Adds a single item to queue. See <see cref="M:LiteQueue.LiteQueue`1.Enqueue(System.Collections.Generic.IEnumerable{`0})"/> for adding a batch.
</summary>
<param name="item"></param>
</member>
<member name="M:LiteQueue.LiteQueue`1.Enqueue(System.Collections.Generic.IEnumerable{`0})">
<summary>
Adds a batch of items to the queue. See <see cref="M:LiteQueue.LiteQueue`1.Enqueue(`0)"/> for adding a single item.
</summary>
<param name="items"></param>
</member>
<member name="M:LiteQueue.LiteQueue`1.Dequeue">
<summary>
Transactional queues:
Marks item as checked out but does not remove from queue. You are expected to later call <see cref="M:LiteQueue.LiteQueue`1.Commit(LiteQueue.QueueEntry{`0})"/> or <see cref="M:LiteQueue.LiteQueue`1.Abort(LiteQueue.QueueEntry{`0})"/>
Non-transactional queues:
Removes item from queue with no need to call <see cref="M:LiteQueue.LiteQueue`1.Commit(LiteQueue.QueueEntry{`0})"/> or <see cref="M:LiteQueue.LiteQueue`1.Abort(LiteQueue.QueueEntry{`0})"/>
</summary>
<returns>An item if found or null</returns>
</member>
<member name="M:LiteQueue.LiteQueue`1.Dequeue(System.Int32)">
<summary>
Batch equivalent of <see cref="M:LiteQueue.LiteQueue`1.Dequeue"/>
</summary>
<param name="batchSize">The maximum number of items to dequeue</param>
<returns>The items found or an empty collection (never null)</returns>
</member>
<member name="M:LiteQueue.LiteQueue`1.CurrentCheckouts">
<summary>
Obtains list of items currently checked out (but not yet commited or aborted) as a result of Dequeue calls on a transactional queue
</summary>
<exception cref="T:System.InvalidOperationException">Thrown when queue is not transactional</exception>
<returns>Items found or empty collection (never null)</returns>
</member>
<member name="M:LiteQueue.LiteQueue`1.ResetOrphans">
<summary>
Aborts all currently checked out items. Equivalent of calling <see cref="M:LiteQueue.LiteQueue`1.CurrentCheckouts"/> followed by <see cref="M:LiteQueue.LiteQueue`1.Abort(System.Collections.Generic.IEnumerable{LiteQueue.QueueEntry{`0}})"/>
</summary>
<exception cref="T:System.InvalidOperationException">Thrown when queue is not transactional</exception>
</member>
<member name="M:LiteQueue.LiteQueue`1.Abort(LiteQueue.QueueEntry{`0})">
<summary>
Aborts a transaction on a single item. See <see cref="M:LiteQueue.LiteQueue`1.Abort(System.Collections.Generic.IEnumerable{LiteQueue.QueueEntry{`0}})"/> for batches.
</summary>
<param name="item">An item that was obtained from a <see cref="M:LiteQueue.LiteQueue`1.Dequeue"/> call</param>
<exception cref="T:System.InvalidOperationException">Thrown when queue is not transactional</exception>
</member>
<member name="M:LiteQueue.LiteQueue`1.Abort(System.Collections.Generic.IEnumerable{LiteQueue.QueueEntry{`0}})">
<summary>
Aborts a transation on a group of items. See <see cref="M:LiteQueue.LiteQueue`1.Abort(LiteQueue.QueueEntry{`0})"/> for a single item.
</summary>
<param name="items">Items that were obtained from a <see cref="M:LiteQueue.LiteQueue`1.Dequeue"/> call</param>
<exception cref="T:System.InvalidOperationException">Thrown when queue is not transactional</exception>
</member>
<member name="M:LiteQueue.LiteQueue`1.Commit(LiteQueue.QueueEntry{`0})">
<summary>
Commits a transaction on a single item. See <see cref="M:LiteQueue.LiteQueue`1.Commit(System.Collections.Generic.IEnumerable{LiteQueue.QueueEntry{`0}})"/> for batches.
</summary>
<param name="item">An item that was obtained from a <see cref="M:LiteQueue.LiteQueue`1.Dequeue"/> call</param>
<exception cref="T:System.InvalidOperationException">Thrown when queue is not transactional</exception>
</member>
<!-- Badly formed XML comment ignored for member "M:LiteQueue.LiteQueue`1.Commit(System.Collections.Generic.IEnumerable{LiteQueue.QueueEntry{`0}})" -->
<member name="M:LiteQueue.LiteQueue`1.Count">
<summary>
Number of items in queue, including those that have been checked out.
</summary>
</member>
<member name="M:LiteQueue.LiteQueue`1.Clear">
<summary>
Removes all items from queue, including any that have been checked out.
</summary>
</member>
</members>
</doc>
using IoTSharp.Data;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Server;
using Quartz;
using SilkierQuartz;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace IoTSharp.Jobs
{
[SilkierQuartz(0, "PushData", "Push Iot Message Data to DataBase ", TriggerGroup = "Data")]
public class PushData : IJob
{
readonly ILogger _logger;
readonly IServiceScope scope;
readonly MqttClientSetting _mcsetting;
readonly IMsgQueue _queue;
public PushData(ILogger<PushData> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options, IMsgQueue queue
)
{
_mcsetting = options.Value.MqttClient;
_logger = logger;
scope = scopeFactor.CreateScope();
_queue = queue;
}
public Task Execute(IJobExecutionContext context)
{
return Task.Run(async () =>
{
var msg = _queue.Dequeue();
if (msg != null)
{
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId);
if (device != null)
{
switch (msg.DataCatalog)
{
case DataCatalog.AttributeData:
var result2 = await _dbContext.SaveAsync<AttributeLatest, AttributeData>(msg.MsgBody, device, msg.DataSide);
_logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result2));
break;
case DataCatalog.TelemetryData:
var result1 = await _dbContext.SaveAsync<TelemetryLatest, TelemetryData>(msg.MsgBody, device, msg.DataSide);
_logger.LogInformation(Newtonsoft.Json.JsonConvert.SerializeObject(result1));
break;
default:
break;
}
}
}
}
});
}
}
}
using LiteDB;
using LiteQueue;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Queue
{
public class DiskQueue : LiteQueue<RawMsg>,IMsgQueue
{
public DiskQueue(string ConnectionString):base(new LiteDatabase(ConnectionString))
{
}
public new void Enqueue(RawMsg msg)
{
base.Enqueue(msg);
}
public new RawMsg Dequeue()
{
var msg = base.Dequeue();
var result = msg.Payload;
try
{
base.Commit(msg);
}
catch (Exception)
{
base.Abort(msg);
result = null;
}
return result;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Queue
{
public enum MsgType
{
MQTT,
CoAP,
HTTP
}
}
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Queue
{
public interface IMsgQueue
{
public void Enqueue(RawMsg msg);
public RawMsg Dequeue();
}
}
using Org.BouncyCastle.Asn1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
namespace IoTSharp.Queue
{
public class MemoryQueue : IMsgQueue
{
Queue<RawMsg> _raws;
public MemoryQueue()
{
_raws = new Queue<RawMsg>();
}
public void Enqueue(RawMsg msg)
{
_raws.Enqueue(msg);
}
public RawMsg Dequeue()
{
return _raws.TryDequeue(out RawMsg raw) ? raw : null;
}
}
}
using IoTSharp.Data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Queue
{
public class RawMsg
{
public MsgType MsgType { get; set; }
public Guid DeviceId { get; set; }
public Dictionary<string, object> MsgBody { get; internal set; }
public DataSide DataSide { get; internal set; }
public DataCatalog DataCatalog { get; internal set; }
}
}
using CoAP.Server;
using IoTSharp.Data;
using IoTSharp.Handlers;
using IoTSharp.Queue;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
......@@ -26,14 +27,14 @@ namespace IoTSharp.Services
private IServiceScope _serviceScope;
private CoapServer server;
private readonly AppSettings _settings;
public CoAPService(ILogger<CoAPService> logger, IServiceScopeFactory scopeFactor, IOptions<AppSettings> options)
public CoAPService(ILogger<CoAPService> logger, IServiceScopeFactory scopeFactor, IOptions<AppSettings> options,IMsgQueue queue)
{
_settings = options.Value;
server = new CoapServer(_settings.CoapServer);
_logger = logger;
_serviceScope = scopeFactor.CreateScope();
_dbContext = _serviceScope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
Enum.GetNames(typeof(CoApRes)).ToList().ForEach(n => server.Add(new CoApResource(n, _serviceScope.ServiceProvider.GetRequiredService<ApplicationDbContext>(), _logger)));
Enum.GetNames(typeof(CoApRes)).ToList().ForEach(n => server.Add(new CoApResource(n, _serviceScope.ServiceProvider.GetRequiredService<ApplicationDbContext>(), _logger,queue)));
}
......
......@@ -38,6 +38,7 @@ using SilkierQuartz;
using HealthChecks.UI.Client;
using NSwag;
using NSwag.Generation.Processors.Security;
using IoTSharp.Queue;
namespace IoTSharp
{
......@@ -142,6 +143,11 @@ namespace IoTSharp
}, name: "Disk Storage");
services.AddHealthChecksUI().AddPostgreSqlStorage(Configuration.GetConnectionString("IoTSharp"));
services.AddSilkierQuartz();
services.AddSingleton<IMsgQueue>(o =>
{
return new DiskQueue(System.IO.Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "DiskQuue.iotsharp"));
});
services.AddMemoryCache();
}
......
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"System": "Information",
"Microsoft": "Information"
}
},
"ConnectionStrings": {
"IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;"
},
"JwtKey": "kissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissme",
"JwtExpireHours": 24,
"JwtIssuer": "IoTSharp.Net",
"JwtAudience": "IoTSharp.Net"
}
\ No newline at end of file
......@@ -9,6 +9,11 @@
<DockerServiceName>iotsharp</DockerServiceName>
</PropertyGroup>
<ItemGroup>
<None Include="appsettings.Development.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<ExcludeFromSingleFile>true</ExcludeFromSingleFile>
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
</None>
<None Include="docker-compose.override.yml">
<DependentUpon>docker-compose.yml</DependentUpon>
</None>
......
......@@ -54,9 +54,6 @@ services:
- 5683:5683
- 5684:5684
- 502:502
volumes:
- "/var/iotsharp/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro"
- "/etc/iotsharp/appsettings.Production.json:/app/appsettings.Production.json"
networks:
- iotsharp-network
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册