Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
IoTSharp
提交
826c28c7
I
IoTSharp
项目概览
jobily
/
IoTSharp
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
I
IoTSharp
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
826c28c7
编写于
1月 28, 2022
作者:
麦壳饼
提交者:
GitHub
1月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #588 from IoTSharp/mqttnet4
Mqttnet4
上级
5d6eee86
f8660edc
变更
16
展开全部
隐藏空白更改
内联
并排
Showing
16 changed file
with
208 addition
and
306 deletion
+208
-306
IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj
IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj
+2
-2
IoTSharp.TaskAction/DeviceActionExcutor.cs
IoTSharp.TaskAction/DeviceActionExcutor.cs
+1
-1
IoTSharp.TaskAction/IoTSharp.TaskAction.csproj
IoTSharp.TaskAction/IoTSharp.TaskAction.csproj
+1
-1
IoTSharp.TaskAction/MessagePullExcutor.cs
IoTSharp.TaskAction/MessagePullExcutor.cs
+2
-2
IoTSharp/Clients/RpcClient.cs
IoTSharp/Clients/RpcClient.cs
+10
-13
IoTSharp/Controllers/DevicesController.cs
IoTSharp/Controllers/DevicesController.cs
+8
-12
IoTSharp/Controllers/SubscriptionController.cs
IoTSharp/Controllers/SubscriptionController.cs
+0
-2
IoTSharp/Dockerfile
IoTSharp/Dockerfile
+2
-4
IoTSharp/Extensions/MqttExtension.cs
IoTSharp/Extensions/MqttExtension.cs
+30
-15
IoTSharp/Handlers/EventBusHandler.cs
IoTSharp/Handlers/EventBusHandler.cs
+81
-65
IoTSharp/Handlers/MQTTServerHandler.cs
IoTSharp/Handlers/MQTTServerHandler.cs
+55
-92
IoTSharp/Handlers/RetainedMessageHandler.cs
IoTSharp/Handlers/RetainedMessageHandler.cs
+0
-75
IoTSharp/IoTSharp.csproj
IoTSharp/IoTSharp.csproj
+10
-11
IoTSharp/Jobs/CheckDevices.cs
IoTSharp/Jobs/CheckDevices.cs
+6
-6
IoTSharp/Services/CoAPService.cs
IoTSharp/Services/CoAPService.cs
+0
-4
IoTSharp/Startup.cs
IoTSharp/Startup.cs
+0
-1
未找到文件。
IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj
浏览文件 @
826c28c7
...
...
@@ -14,9 +14,9 @@
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.1" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.
0
" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.
1
" />
<PackageReference Include="EFCore.Sharding.MySql" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.NetTopologySuite" Version="6.0.
0
" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.NetTopologySuite" Version="6.0.
1
" />
</ItemGroup>
<ItemGroup>
...
...
IoTSharp.TaskAction/DeviceActionExcutor.cs
浏览文件 @
826c28c7
...
...
@@ -26,7 +26,7 @@ namespace IoTSharp.TaskAction
string
contentType
=
"application/json"
;
var
restclient
=
new
RestClient
(
config
.
BaseUrl
);
var
request
=
new
RestRequest
(
config
.
Url
+
(
input
.
DeviceId
==
Guid
.
Empty
?
""
:
"/"
+
input
.
DeviceId
),
Method
.
P
ost
);
var
request
=
new
RestRequest
(
config
.
Url
+
(
input
.
DeviceId
==
Guid
.
Empty
?
""
:
"/"
+
input
.
DeviceId
),
Method
.
P
OST
);
request
.
AddHeader
(
"X-Access-Token"
,
config
.
Token
);
request
.
RequestFormat
=
DataFormat
.
Json
;
...
...
IoTSharp.TaskAction/IoTSharp.TaskAction.csproj
浏览文件 @
826c28c7
...
...
@@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RestSharp" Version="10
7.1.1
" />
<PackageReference Include="RestSharp" Version="10
6.15.0
" />
</ItemGroup>
<ItemGroup>
...
...
IoTSharp.TaskAction/MessagePullExcutor.cs
浏览文件 @
826c28c7
...
...
@@ -64,7 +64,7 @@ namespace IoTSharp.TaskAction
var
dd
=
o
.
Properties
().
Select
(
c
=>
new
ParamObject
{
keyName
=
c
.
Name
,
value
=
JPropertyToObject
(
c
.
Value
.
First
as
JProperty
)
}).
ToList
();
string
contentType
=
"application/json"
;
var
restclient
=
new
RestClient
(
config
.
BaseUrl
);
var
request
=
new
RestRequest
(
config
.
Url
+
(
input
.
DeviceId
==
Guid
.
Empty
?
""
:
"/"
+
input
.
DeviceId
),
Method
.
P
ost
);
var
request
=
new
RestRequest
(
config
.
Url
+
(
input
.
DeviceId
==
Guid
.
Empty
?
""
:
"/"
+
input
.
DeviceId
),
Method
.
P
OST
);
request
.
AddHeader
(
"X-Access-Token"
,
config
.
Token
);
request
.
RequestFormat
=
DataFormat
.
Json
;
...
...
@@ -98,7 +98,7 @@ namespace IoTSharp.TaskAction
var
dd
=
o
.
Properties
().
Select
(
c
=>
new
ParamObject
{
keyName
=
c
.
Name
,
value
=
JPropertyToObject
(
c
)
}).
ToList
();
string
contentType
=
"application/json"
;
var
restclient
=
new
RestClient
(
config
.
BaseUrl
);
var
request
=
new
RestRequest
(
config
.
Url
+
(
input
.
DeviceId
==
Guid
.
Empty
?
""
:
"/"
+
input
.
DeviceId
),
Method
.
P
ost
);
var
request
=
new
RestRequest
(
config
.
Url
+
(
input
.
DeviceId
==
Guid
.
Empty
?
""
:
"/"
+
input
.
DeviceId
),
Method
.
P
OST
);
request
.
AddHeader
(
"X-Access-Token"
,
config
.
Token
);
request
.
RequestFormat
=
DataFormat
.
Json
;
...
...
IoTSharp/Clients/RpcClient.cs
浏览文件 @
826c28c7
using
Microsoft.Extensions.Logging
;
using
MQTTnet
;
using
MQTTnet.Client
;
using
MQTTnet.Client.Options
;
using
MQTTnet.Client.Receiving
;
using
MQTTnet.Exceptions
;
using
MQTTnet.Protocol
;
using
System
;
...
...
@@ -30,9 +28,11 @@ namespace IoTSharp.Extensions
{
_mqttClient
=
mqttClient
??
throw
new
ArgumentNullException
(
nameof
(
mqttClient
));
_logger
=
logger
;
_mqttClient
.
ApplicationMessageReceived
Handler
=
new
MqttApplicationMessageReceivedHandlerDelegate
(
args
=>
OnApplicationMessageReceived
(
mqttClient
,
args
)
)
;
_mqttClient
.
ApplicationMessageReceived
Async
+=
OnApplicationMessageReceived
;
}
public
RpcClient
(
IMqttClientOptions
mqtt
,
Microsoft
.
Extensions
.
Logging
.
ILogger
_logger
)
:
this
(
new
MQTTnet
.
MqttFactory
().
CreateMqttClient
(),
_logger
)
{
_mqtt
=
mqtt
;
...
...
@@ -121,19 +121,16 @@ namespace IoTSharp.Extensions
}
}
private
void
OnApplicationMessageReceived
(
object
sender
,
MqttApplicationMessageReceivedEventArgs
eventArgs
)
private
Task
OnApplicationMessageReceived
(
MqttApplicationMessageReceivedEventArgs
eventArgs
)
{
if
(!
_waitingCalls
.
TryRemove
(
eventArgs
.
ApplicationMessage
.
Topic
,
out
var
tcs
))
{
return
;
}
if
(
tcs
.
Task
.
IsCompleted
||
tcs
.
Task
.
IsCanceled
)
if
(
_waitingCalls
.
TryRemove
(
eventArgs
.
ApplicationMessage
.
Topic
,
out
var
tcs
))
{
return
;
if
(!
tcs
.
Task
.
IsCompleted
&&
!
tcs
.
Task
.
IsCanceled
)
{
tcs
.
TrySetResult
(
eventArgs
.
ApplicationMessage
.
Payload
);
}
}
tcs
.
TrySetResult
(
eventArgs
.
ApplicationMessage
.
Payload
);
return
Task
.
CompletedTask
;
}
...
...
IoTSharp/Controllers/DevicesController.cs
浏览文件 @
826c28c7
...
...
@@ -19,14 +19,9 @@ using MQTTnet.Protocol;
using
IoTSharp.Extensions
;
using
IoTSharp.Models
;
using
MQTTnet.Exceptions
;
using
MQTTnet.Client.Options
;
using
Microsoft.AspNetCore.Identity
;
using
Microsoft.Extensions.Logging
;
using
IoTSharp.Storage
;
using
k8s.Models
;
using
Newtonsoft.Json.Linq
;
using
MQTTnet.AspNetCoreEx
;
using
MQTTnet.Server.Status
;
using
System.Security.Cryptography.X509Certificates
;
using
Microsoft.Extensions.Options
;
using
IoTSharp.X509Extensions
;
...
...
@@ -34,6 +29,7 @@ using System.IO;
using
System.IO.Compression
;
using
DotNetCore.CAP
;
using
LinqKit
;
using
MQTTnet.Server
;
namespace
IoTSharp.Controllers
{
...
...
@@ -51,12 +47,12 @@ namespace IoTSharp.Controllers
private
readonly
SignInManager
<
IdentityUser
>
_signInManager
;
private
readonly
ILogger
_logger
;
private
readonly
IStorage
_storage
;
private
readonly
IMqttServerEx
_serverEx
;
private
readonly
MqttServer
_serverEx
;
private
readonly
AppSettings
_setting
;
private
readonly
ICapPublisher
_queue
;
public
DevicesController
(
UserManager
<
IdentityUser
>
userManager
,
SignInManager
<
IdentityUser
>
signInManager
,
ILogger
<
DevicesController
>
logger
,
IMqttServerEx
serverEx
,
ApplicationDbContext
context
,
IMqttClientOptions
mqtt
,
IStorage
storage
,
IOptions
<
AppSettings
>
options
,
ICapPublisher
queue
)
SignInManager
<
IdentityUser
>
signInManager
,
ILogger
<
DevicesController
>
logger
,
MqttServer
serverEx
,
ApplicationDbContext
context
,
IMqttClientOptions
mqtt
,
IStorage
storage
,
IOptions
<
AppSettings
>
options
,
ICapPublisher
queue
)
{
_context
=
context
;
_mqtt
=
mqtt
;
...
...
@@ -944,9 +940,9 @@ namespace IoTSharp.Controllers
[
HttpGet
(
"SessionStatus"
)]
[
ProducesResponseType
(
StatusCodes
.
Status200OK
)]
[
ProducesDefaultResponseType
]
public
async
Task
<
ApiResult
<
IList
<
I
MqttSessionStatus
>>>
GetSessionStatus
()
public
async
Task
<
ApiResult
<
IList
<
MqttSessionStatus
>>>
GetSessionStatus
()
{
return
new
ApiResult
<
IList
<
IMqttSessionStatus
>>(
ApiCode
.
Success
,
"OK"
,
await
_serverEx
.
GetSessionStatu
sAsync
());
return
new
ApiResult
<
IList
<
MqttSessionStatus
>>(
ApiCode
.
Success
,
"OK"
,
await
_serverEx
.
GetSession
sAsync
());
}
/// <summary>
/// SessionStatus
...
...
@@ -956,9 +952,9 @@ namespace IoTSharp.Controllers
[
HttpGet
(
"ClientStatus"
)]
[
ProducesResponseType
(
StatusCodes
.
Status200OK
)]
[
ProducesDefaultResponseType
]
public
async
Task
<
ApiResult
<
IList
<
I
MqttClientStatus
>>>
GetClientStatus
()
public
async
Task
<
ApiResult
<
IList
<
MqttClientStatus
>>>
GetClientStatus
()
{
return
new
ApiResult
<
IList
<
IMqttClientStatus
>>(
ApiCode
.
Success
,
"OK"
,
await
_serverEx
.
GetClientStatu
sAsync
());
return
new
ApiResult
<
IList
<
MqttClientStatus
>>(
ApiCode
.
Success
,
"OK"
,
await
_serverEx
.
GetClient
sAsync
());
}
[
Authorize
(
Roles
=
nameof
(
UserRole
.
NormalUser
))]
...
...
@@ -967,7 +963,7 @@ namespace IoTSharp.Controllers
[
ProducesDefaultResponseType
]
public
async
Task
<
ApiResult
<
int
>>
GetSessionsCount
()
{
return
new
ApiResult
<
int
>(
ApiCode
.
Success
,
"OK"
,
(
await
_serverEx
.
GetClient
Statu
sAsync
()).
Count
);
return
new
ApiResult
<
int
>(
ApiCode
.
Success
,
"OK"
,
(
await
_serverEx
.
GetClientsAsync
()).
Count
);
}
}
}
\ No newline at end of file
IoTSharp/Controllers/SubscriptionController.cs
浏览文件 @
826c28c7
...
...
@@ -23,8 +23,6 @@ namespace IoTSharp.Controllers
public
class
SubscriptionEventController
:
Controller
{
private
ApplicationDbContext
_context
;
private
readonly
FlowRuleProcessor
_flowRuleProcessor
;
private
readonly
TaskExecutorHelper
_helper
;
private
UserManager
<
IdentityUser
>
_userManager
;
// GET: SubscriptionEventController
...
...
IoTSharp/Dockerfile
浏览文件 @
826c28c7
...
...
@@ -48,6 +48,8 @@ RUN KEYRING=/usr/share/keyrings/nodesource.gpg && curl -fsSL https://deb.nodesou
WORKDIR
/src
COPY
["IoTSharp/ClientApp/package.json", "IoTSharp/ClientApp/package.json"]
RUN
npm
install
--prefix
./IoTSharp/ClientApp/
COPY
["IoTSharp/IoTSharp.csproj", "IoTSharp/"]
COPY
["IoTSharp.Data/IoTSharp.Data.csproj", "IoTSharp.Data/"]
COPY
["IoTSharp.Interpreter/IoTSharp.Interpreter.csproj", "IoTSharp.Interpreter/"]
...
...
@@ -59,15 +61,11 @@ COPY ["IoTSharp.Data.Oracle/IoTSharp.Data.Oracle.csproj", "IoTSharp.Data.Oracle/
COPY
["IoTSharp.Data.PostgreSQL/IoTSharp.Data.PostgreSQL.csproj", "IoTSharp.Data.PostgreSQL/"]
COPY
["IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj", "IoTSharp.Data.MySQL/"]
RUN
dotnet restore
"IoTSharp/IoTSharp.csproj"
COPY
["IoTSharp/ClientApp/package.json", "IoTSharp/ClientApp/package.json"]
RUN
npm
install
--prefix
./IoTSharp/ClientApp/
COPY
. .
WORKDIR
"/src/IoTSharp"
RUN
dotnet build
"IoTSharp.csproj"
-c
Release
-o
/app/build
FROM
build AS publish
RUN
dotnet publish
"IoTSharp.csproj"
-c
Release
-o
/app/publish
...
...
IoTSharp/Extensions/MqttExtension.cs
浏览文件 @
826c28c7
...
...
@@ -8,18 +8,15 @@ using Microsoft.Extensions.DependencyInjection;
using
Microsoft.AspNetCore.Builder
;
using
MQTTnet.AspNetCore
;
using
MQTTnet.Diagnostics
;
using
MQTTnet.AspNetCoreEx
;
using
IoTSharp.Handlers
;
using
IoTSharp.Services
;
using
MQTTnet.Server
;
using
MQTTnet.Client.Receiving
;
using
MQTTnet.Client.Options
;
using
System.Diagnostics
;
using
System.Runtime.InteropServices
;
using
System.Threading
;
using
MQTTnet.AspNetCore.Extensions
;
using
MQTTnet.Diagnostics.Logger
;
using
System.Security.Cryptography.X509Certificates
;
using
MQTTnet
;
namespace
IoTSharp
{
...
...
@@ -29,7 +26,7 @@ namespace IoTSharp
public
static
void
AddIoTSharpMqttServer
(
this
IServiceCollection
services
,
MqttBrokerSetting
broker
)
{
services
.
AddMqttTcpServerAdapter
();
services
.
AddHostedMqttServer
Ex
(
options
=>
services
.
AddHostedMqttServer
(
options
=>
{
options
.
WithDefaultEndpointPort
(
broker
.
Port
).
WithDefaultEndpoint
();
if
(
broker
.
EnableTls
)
...
...
@@ -59,19 +56,37 @@ namespace IoTSharp
public
static
void
UseIotSharpMqttServer
(
this
IApplicationBuilder
app
)
{
var
mqttEvents
=
app
.
ApplicationServices
.
CreateScope
().
ServiceProvider
.
GetService
<
MQTTServerHandler
>();
IMqttServerStorage
storage
=
app
.
ApplicationServices
.
CreateScope
().
ServiceProvider
.
GetService
<
IMqttServerStorage
>();
app
.
UseMqttServerEx
(
server
=>
app
.
UseMqttServer
(
server
=>
{
server
.
ClientConnected
Handler
=
new
MqttServerClientConnectedHandlerDelegate
(
args
=>
mqttEvents
.
Server_ClientConnected
(
server
,
args
))
;
server
.
Started
Handler
=
new
MqttServerStartedHandlerDelegate
(
args
=>
mqttEvents
.
Server_Started
(
server
,
args
))
;
server
.
Stopped
Handler
=
new
MqttServerStoppedHandlerDelegate
(
args
=>
mqttEvents
.
Server_Stopped
(
server
,
args
))
;
server
.
ApplicationMessage
ReceivedHandler
=
new
MqttApplicationMessageReceivedHandlerDelegate
(
args
=>
mqttEvents
.
Server_ApplicationMessageReceived
(
server
,
args
))
;
server
.
ClientSubscribedTopicHandler
=
new
MqttServerClientSubscribedTopicHandlerDelegate
(
args
=>
mqttEvents
.
Server_ClientSubscribedTopic
(
server
,
args
))
;
server
.
ClientUnsubscribedTopic
Handler
=
new
MqttServerClientUnsubscribedTopicHandlerDelegate
(
args
=>
mqttEvents
.
Server_ClientUnsubscribedTopic
(
server
,
args
))
;
server
.
ClientConnectionValidatorHandler
=
new
MqttServerClientConnectionValidatorHandlerDelegate
(
args
=>
mqttEvents
.
Server_ClientConnectionValidator
(
server
,
args
))
;
server
.
ClientDisconnected
Handler
=
new
MqttServerClientDisconnectedHandlerDelegate
(
args
=>
mqttEvents
.
Server_ClientDisconnected
(
server
,
args
))
;
server
.
ClientConnected
Async
+=
mqttEvents
.
Server_ClientConnectedAsync
;
server
.
Started
Async
+=
mqttEvents
.
Server_Started
;
server
.
Stopped
Async
+=
mqttEvents
.
Server_Stopped
;
server
.
ApplicationMessage
NotConsumedAsync
+=
mqttEvents
.
Server_ApplicationMessageReceived
;
server
.
ClientSubscribedTopicAsync
+=
mqttEvents
.
Server_ClientSubscribedTopic
;
server
.
ClientUnsubscribedTopic
Async
+=
mqttEvents
.
Server_ClientUnsubscribedTopic
;
server
.
ValidatingConnectionAsync
+=
mqttEvents
.
Server_ClientConnectionValidator
;
server
.
ClientDisconnected
Async
+=
mqttEvents
.
Server_ClientDisconnected
;
});
}
public
static
async
Task
PublishAsync
<
T
>(
this
MqttServer
mqtt
,
string
SenderClientId
,
string
topic
,
T
_payload
)
where
T
:
class
{
await
mqtt
.
PublishAsync
(
SenderClientId
,
new
MqttApplicationMessage
()
{
Topic
=
topic
,
Payload
=
System
.
Text
.
Json
.
JsonSerializer
.
SerializeToUtf8Bytes
(
_payload
)
});
}
public
static
async
Task
PublishAsync
(
this
MqttServer
mqtt
,
string
SenderClientId
,
string
topic
,
string
_payload
)
{
await
mqtt
.
PublishAsync
(
SenderClientId
,
new
MqttApplicationMessage
()
{
Topic
=
topic
,
Payload
=
System
.
Text
.
Encoding
.
Default
.
GetBytes
(
_payload
)
});
}
public
static
async
Task
PublishAsync
(
this
MqttServer
mqtt
,
string
SenderClientId
,
string
topic
,
byte
[]
_payload
)
{
await
mqtt
.
PublishAsync
(
SenderClientId
,
new
MqttApplicationMessage
()
{
Topic
=
topic
,
Payload
=
_payload
});
}
public
static
async
Task
PublishAsync
(
this
MqttServer
mqtt
,
string
SenderClientId
,
MqttApplicationMessage
message
)
{
var
clients
=
await
mqtt
.
GetClientsAsync
();
var
client
=
clients
.
FirstOrDefault
(
c
=>
c
.
Id
==
SenderClientId
);
await
client
.
Session
.
EnqueueApplicationMessageAsync
(
message
);
}
public
static
void
AddMqttClient
(
this
IServiceCollection
services
,
MqttClientSetting
setting
)
...
...
IoTSharp/Handlers/EventBusHandler.cs
浏览文件 @
826c28c7
...
...
@@ -46,69 +46,77 @@ namespace IoTSharp.Handlers
[
CapSubscribe
(
"iotsharp.services.datastream.attributedata"
)]
public
async
void
StoreAttributeData
(
RawMsg
msg
)
{
using
(
var
_scope
=
_scopeFactor
.
CreateScope
())
try
{
using
(
var
_
dbContext
=
_scope
.
ServiceProvider
.
GetRequiredService
<
ApplicationDbContext
>
())
using
(
var
_
scope
=
_scopeFactor
.
CreateScope
())
{
var
device
=
_dbContext
.
Device
.
FirstOrDefault
(
d
=>
d
.
Id
==
msg
.
DeviceId
);
if
(
device
!=
null
)
using
(
var
_dbContext
=
_scope
.
ServiceProvider
.
GetRequiredService
<
ApplicationDbContext
>())
{
var
mb
=
msg
.
MsgBody
;
Dictionary
<
string
,
object
>
dc
=
new
Dictionary
<
string
,
object
>();
mb
.
ToList
().
ForEach
(
kp
=>
var
device
=
_dbContext
.
Device
.
FirstOrDefault
(
d
=>
d
.
Id
==
msg
.
DeviceId
);
if
(
device
!=
null
)
{
if
(
kp
.
Value
.
GetType
()
==
typeof
(
System
.
Text
.
Json
.
JsonElement
))
var
mb
=
msg
.
MsgBody
;
Dictionary
<
string
,
object
>
dc
=
new
Dictionary
<
string
,
object
>();
mb
.
ToList
().
ForEach
(
kp
=>
{
var
je
=
(
System
.
Text
.
Json
.
JsonElement
)
kp
.
Value
;
switch
(
je
.
ValueKind
)
if
(
kp
.
Value
.
GetType
()
==
typeof
(
System
.
Text
.
Json
.
JsonElement
))
{
case
System
.
Text
.
Json
.
JsonValueKind
.
Undefined
:
case
System
.
Text
.
Json
.
JsonValueKind
.
Object
:
case
System
.
Text
.
Json
.
JsonValueKind
.
Array
:
dc
.
Add
(
kp
.
Key
,
je
.
GetRawText
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
String
:
dc
.
Add
(
kp
.
Key
,
je
.
GetString
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
Number
:
dc
.
Add
(
kp
.
Key
,
je
.
GetDouble
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
True
:
case
System
.
Text
.
Json
.
JsonValueKind
.
False
:
dc
.
Add
(
kp
.
Key
,
je
.
GetBoolean
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
Null
:
break
;
default
:
break
;
var
je
=
(
System
.
Text
.
Json
.
JsonElement
)
kp
.
Value
;
switch
(
je
.
ValueKind
)
{
case
System
.
Text
.
Json
.
JsonValueKind
.
Undefined
:
case
System
.
Text
.
Json
.
JsonValueKind
.
Object
:
case
System
.
Text
.
Json
.
JsonValueKind
.
Array
:
dc
.
Add
(
kp
.
Key
,
je
.
GetRawText
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
String
:
dc
.
Add
(
kp
.
Key
,
je
.
GetString
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
Number
:
dc
.
Add
(
kp
.
Key
,
je
.
GetDouble
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
True
:
case
System
.
Text
.
Json
.
JsonValueKind
.
False
:
dc
.
Add
(
kp
.
Key
,
je
.
GetBoolean
());
break
;
case
System
.
Text
.
Json
.
JsonValueKind
.
Null
:
break
;
default
:
break
;
}
}
else
{
dc
.
Add
(
kp
.
Key
,
kp
.
Value
);
}
}
else
{
dc
.
Add
(
kp
.
Key
,
kp
.
Value
);
}
});
var
result2
=
await
_dbContext
.
SaveAsync
<
AttributeLatest
>(
dc
,
device
.
Id
,
msg
.
DataSide
);
result2
.
exceptions
?.
ToList
().
ForEach
(
ex
=>
{
_logger
.
LogError
(
$"
{
ex
.
Key
}
{
ex
.
Value
}
{
Newtonsoft
.
Json
.
JsonConvert
.
SerializeObject
(
msg
.
MsgBody
[
ex
.
Key
])}
"
);
});
_logger
.
LogInformation
(
$"更新
{
device
.
Name
}
(
{
device
.
Id
}
)属性数据结果
{
result2
.
ret
}
"
);
ExpandoObject
obj
=
new
ExpandoObject
();
dc
.
ToList
().
ForEach
(
kv
=>
{
obj
.
TryAdd
(
kv
.
Key
,
kv
.
Value
);
});
await
RunRules
(
msg
.
DeviceId
,
obj
,
MountType
.
Telemetry
);
});
var
result2
=
await
_dbContext
.
SaveAsync
<
AttributeLatest
>(
dc
,
device
.
Id
,
msg
.
DataSide
);
result2
.
exceptions
?.
ToList
().
ForEach
(
ex
=>
{
_logger
.
LogError
(
$"
{
ex
.
Key
}
{
ex
.
Value
}
{
Newtonsoft
.
Json
.
JsonConvert
.
SerializeObject
(
msg
.
MsgBody
[
ex
.
Key
])}
"
);
});
_logger
.
LogInformation
(
$"更新
{
device
.
Name
}
(
{
device
.
Id
}
)属性数据结果
{
result2
.
ret
}
"
);
ExpandoObject
obj
=
new
ExpandoObject
();
dc
.
ToList
().
ForEach
(
kv
=>
{
obj
.
TryAdd
(
kv
.
Key
,
kv
.
Value
);
});
await
RunRules
(
msg
.
DeviceId
,
obj
,
MountType
.
Telemetry
);
}
}
}
}
catch
(
Exception
ex
)
{
_logger
.
LogError
(
ex
,
"StoreAttributeData:"
+
ex
.
Message
);
}
}
...
...
@@ -184,25 +192,33 @@ namespace IoTSharp.Handlers
private
async
Task
RunRules
(
Guid
devid
,
object
obj
,
MountType
mountType
)
{
var
rules
=
await
_caching
.
GetAsync
(
$"ruleid_
{
devid
}
_
{
Enum
.
GetName
(
mountType
)}
"
,
async
()
=>
try
{
using
(
var
scope
=
_scopeFactor
.
CreateScope
())
using
(
var
_dbContext
=
scope
.
ServiceProvider
.
GetRequiredService
<
ApplicationDbContext
>())
var
rules
=
await
_caching
.
GetAsync
(
$"ruleid_
{
devid
}
_
{
Enum
.
GetName
(
mountType
)}
"
,
async
()
=>
{
using
(
var
scope
=
_scopeFactor
.
CreateScope
())
using
(
var
_dbContext
=
scope
.
ServiceProvider
.
GetRequiredService
<
ApplicationDbContext
>())
{
var
guids
=
await
_dbContext
.
GerDeviceRulesIdList
(
devid
,
mountType
);
return
guids
;
}
},
TimeSpan
.
FromSeconds
(
_appSettings
.
RuleCachingExpiration
));
if
(
rules
.
HasValue
)
{
var
guids
=
await
_dbContext
.
GerDeviceRulesIdList
(
devid
,
mountType
);
return
guids
;
rules
.
Value
.
ToList
().
ForEach
(
async
g
=>
{
await
_flowRuleProcessor
.
RunFlowRules
(
g
,
obj
,
devid
,
EventType
.
Normal
,
null
);
});
}
},
TimeSpan
.
FromSeconds
(
_appSettings
.
RuleCachingExpiration
));
if
(
rules
.
HasValue
)
{
rules
.
Value
.
ToList
().
ForEach
(
async
g
=>
else
{
await
_flowRuleProcessor
.
RunFlowRules
(
g
,
obj
,
devid
,
EventType
.
Normal
,
null
);
}
);
_logger
.
LogInformation
(
$"
{
devid
}
的数据无相关规则链处理。"
);
}
}
else
catch
(
Exception
ex
)
{
_logger
.
LogInformation
(
$"
{
devid
}
的数据无相关规则链处理。"
);
_logger
.
LogError
(
ex
,
$"
{
devid
}
的数据无相关规则链处理。"
);
}
}
}
...
...
IoTSharp/Handlers/MQTTServerHandler.cs
浏览文件 @
826c28c7
此差异已折叠。
点击以展开。
IoTSharp/Handlers/RetainedMessageHandler.cs
已删除
100644 → 0
浏览文件 @
5d6eee86
using
IoTSharp.Data
;
using
Microsoft.EntityFrameworkCore
;
using
Microsoft.Extensions.DependencyInjection
;
using
Microsoft.Extensions.Logging
;
using
MQTTnet
;
using
MQTTnet.Server
;
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
using
System.Threading
;
using
System.Threading.Tasks
;
using
IoTSharp.Extensions
;
using
Silkier.AspNetCore
;
namespace
IoTSharp.Handlers
{
public
class
RetainedMessageHandler
:
IMqttServerStorage
{
private
ApplicationDbContext
_context
;
private
ILogger
_logger
;
public
RetainedMessageHandler
(
ILogger
<
RetainedMessageHandler
>
logger
,
IServiceScopeFactory
scopeFactor
)
{
_context
=
scopeFactor
.
GetRequiredService
<
ApplicationDbContext
>();
_logger
=
logger
;
}
public
async
Task
<
IList
<
MqttApplicationMessage
>>
LoadRetainedMessagesAsync
()
{
//try
//{
// var lst = from m in _context.RetainedMessage select (MqttApplicationMessage)m;
// return await lst.ToListAsync();
//}
//catch (Exception ex)
//{
// _logger.LogError(ex, $"load RetainedMessage error {ex.Message} ");
// return new List<MqttApplicationMessage>();
//}
return
await
Task
.
FromResult
(
new
List
<
MqttApplicationMessage
>());
}
public
Task
SaveRetainedMessagesAsync
(
IList
<
MqttApplicationMessage
>
messages
)
{
// BaseTask.Factory.StartNew(() =>
//{
// _context.Database.BeginTransaction();
// try
// {
// DateTime dateTime = DateTime.Now;
// var needsave = from mam in messages select new RetainedMessage(mam);
// var ids = needsave.Select(x => x.Id).ToList();
// var dbids = _context.RetainedMessage.Select(x => x.Id).ToArray();
// var needdelete = dbids.Except(ids);//.Except(dbids);
// var del = from f in _context.RetainedMessage where needdelete.Contains(f.Id) select f;
// var needadd = ids.Except(dbids);
// var add = from f in needsave where needadd.Contains(f.Id) select f;
// if (del.Any()) _context.RetainedMessage.RemoveRange(del);
// if (add.Any()) _context.RetainedMessage.AddRange(add);
// int ret = _context.SaveChanges();
// _context.Database.CommitTransaction();
// _logger.LogInformation($"{ret} pieces of data were saved and took {DateTime.Now.Subtract(dateTime).TotalSeconds} seconds.");
// }
// catch (Exception ex)
// {
// _context.Database.RollbackTransaction();
// _logger.LogError(ex, $" An exception was encountered,{ex.Message}");
// }
//}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
return
Task
.
CompletedTask
;
}
}
}
\ No newline at end of file
IoTSharp/IoTSharp.csproj
浏览文件 @
826c28c7
...
...
@@ -63,12 +63,12 @@
<PackageReference Include="DotNetCore.CAP.MongoDB" Version="6.0.0" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="6.0.0" />
<PackageReference Include="DotNetCore.CAP.PostgreSql" Version="6.0.0" />
<PackageReference Include="EasyCaching.Core" Version="1.
4.1
" />
<PackageReference Include="EasyCaching.InMemory" Version="1.
4.1
" />
<PackageReference Include="EasyCaching.LiteDB" Version="1.
4.1
" />
<PackageReference Include="EasyCaching.Redis" Version="1.
4.1
" />
<PackageReference Include="EasyCaching.Core" Version="1.
5.0
" />
<PackageReference Include="EasyCaching.InMemory" Version="1.
5.0
" />
<PackageReference Include="EasyCaching.LiteDB" Version="1.
5.0
" />
<PackageReference Include="EasyCaching.Redis" Version="1.
5.0
" />
<PackageReference Include="hyjiacan.pinyin4net" Version="4.1.0" />
<PackageReference Include="InfluxDB.Client" Version="3.
3.0-dev.4823
" />
<PackageReference Include="InfluxDB.Client" Version="3.
2.0
" />
<PackageReference Include="IoTSharp.CoAP.NET" Version="2.0.8" />
<PackageReference Include="IoTSharp.X509Extensions" Version="1.4.27" />
<PackageReference Include="Jdenticon-net" Version="3.1.2" />
...
...
@@ -89,10 +89,9 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="6.0.1" />
<PackageReference Include="MQTTnet" Version="3.1.1" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.1" />
<PackageReference Include="MQTTnet.AspNetCoreEx" Version="3.1.2" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="3.1.1" />
<PackageReference Include="MQTTnet" Version="4.0.0-preview1" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.0.0-preview1" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="4.0.0-preview1" />
<PackageReference Include="NetMQ" Version="4.0.1.6" />
<PackageReference Include="Npgsql" Version="6.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.2" />
...
...
@@ -102,7 +101,7 @@
<PackageReference Include="PinusDB.HealthChecks" Version="1.0.10" />
<PackageReference Include="ProxyKit" Version="2.3.4" />
<PackageReference Include="Quartz.Serialization.Json" Version="3.3.3" />
<PackageReference Include="RestSharp" Version="10
7.1.1
" />
<PackageReference Include="RestSharp" Version="10
6.15.0
" />
<PackageReference Include="Rin" Version="2.6.0" />
<PackageReference Include="Rin.Mvc" Version="2.6.0" />
<PackageReference Include="RulesEngine" Version="3.5.0" />
...
...
@@ -124,7 +123,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="6.0.0" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="
3.1.
1" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="
4.0.0-preview
1" />
<PackageReference Include="Microsoft.AspNetCore.SpaServices.Extensions" Version="6.0.1" />
<PackageReference Include="Microsoft.AspNetCore.ApiAuthorization.IdentityServer" Version="6.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics.EntityFrameworkCore" Version="6.0.1" />
...
...
IoTSharp/Jobs/CheckDevices.cs
浏览文件 @
826c28c7
...
...
@@ -3,7 +3,7 @@ using IoTSharp.Data;
using
Microsoft.Extensions.DependencyInjection
;
using
Microsoft.Extensions.Logging
;
using
Microsoft.Extensions.Options
;
using
MQTTnet.
AspNetCoreEx
;
using
MQTTnet.
Server
;
using
Quartz
;
using
SilkierQuartz
;
using
System
;
...
...
@@ -20,9 +20,9 @@ namespace IoTSharp.Jobs
private
readonly
MqttClientSetting
_mcsetting
;
private
readonly
ILogger
<
CheckDevices
>
_logger
;
private
readonly
IServiceScopeFactory
_scopeFactor
;
private
readonly
IMqttServerEx
_serverEx
;
private
readonly
MqttServer
_serverEx
;
public
CheckDevices
(
ILogger
<
CheckDevices
>
logger
,
IServiceScopeFactory
scopeFactor
,
IMqttServerEx
serverEx
public
CheckDevices
(
ILogger
<
CheckDevices
>
logger
,
IServiceScopeFactory
scopeFactor
,
MqttServer
serverEx
,
IOptions
<
AppSettings
>
options
)
{
_mcsetting
=
options
.
Value
.
MqttClient
;
...
...
@@ -38,12 +38,12 @@ namespace IoTSharp.Jobs
using
(
var
scope
=
_scopeFactor
.
CreateScope
())
using
(
var
_dbContext
=
scope
.
ServiceProvider
.
GetRequiredService
<
ApplicationDbContext
>())
{
var
clientstatus
=
await
_serverEx
.
GetClient
Statu
sAsync
();
var
clientstatus
=
await
_serverEx
.
GetClientsAsync
();
clientstatus
.
ToList
().
ForEach
(
cs
=>
{
try
{
var
_device
=
cs
.
Session
.
Items
?.
FirstOrDefault
(
k
=>
(
string
)
k
.
Key
==
nameof
(
Device
)).
Value
as
Device
;
var
_device
=
cs
.
Session
.
Items
[
nameof
(
Device
)]
as
Device
;
if
(
_device
!=
null
)
{
var
d
=
_dbContext
.
Device
.
FirstOrDefault
(
d
=>
d
.
Id
==
_device
.
Id
);
...
...
@@ -58,7 +58,7 @@ namespace IoTSharp.Jobs
}
catch
(
Exception
ex
)
{
_logger
.
LogInformation
(
$"检查设备
{
cs
.
Client
Id
}
-
{
cs
.
Endpoint
}
) 时遇到异常
{
ex
.
Message
}{
ex
.
InnerException
?.
Message
}
发送消息:
{
cs
.
SentApplicationMessagesCount
}
(
{
cs
.
BytesSent
}
kb) 收到
{
cs
.
ReceivedApplicationMessagesCount
}
(
{
cs
.
BytesReceived
/
1024
}
KB ) "
);
_logger
.
LogInformation
(
$"检查设备
{
cs
.
Id
}
-
{
cs
.
Endpoint
}
) 时遇到异常
{
ex
.
Message
}{
ex
.
InnerException
?.
Message
}
发送消息:
{
cs
.
SentApplicationMessagesCount
}
(
{
cs
.
BytesSent
}
kb) 收到
{
cs
.
ReceivedApplicationMessagesCount
}
(
{
cs
.
BytesReceived
/
1024
}
KB ) "
);
}
});
...
...
IoTSharp/Services/CoAPService.cs
浏览文件 @
826c28c7
...
...
@@ -7,10 +7,6 @@ using Microsoft.Extensions.Hosting;
using
Microsoft.Extensions.Logging
;
using
Microsoft.Extensions.Options
;
using
MQTTnet.Client
;
using
MQTTnet.Client.Connecting
;
using
MQTTnet.Client.Disconnecting
;
using
MQTTnet.Client.Options
;
using
MQTTnet.Client.Receiving
;
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
...
...
IoTSharp/Startup.cs
浏览文件 @
826c28c7
...
...
@@ -158,7 +158,6 @@ namespace IoTSharp
services
.
AddTransient
<
ApplicationDBInitializer
>();
services
.
AddIoTSharpMqttServer
(
settings
.
MqttBroker
);
services
.
AddMqttClient
(
settings
.
MqttClient
);
services
.
AddSingleton
<
RetainedMessageHandler
>();
services
.
AddSilkierQuartz
(
options
=>
{
options
.
VirtualPathRoot
=
"/quartz"
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录