ShardingStorage.cs 5.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
using EFCore.Sharding;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace IoTSharp.Storage
{
    /// <summary>
    /// 由于此模式目前无法通过EFCore.Sharding 进行Group By 获取最新遥测数据, 和Select  新对象, 所以, 最新遥测依然从DataStorage表里获取,历史从分表里获取
    /// 更多内容可以参考<seealso cref="https://github.com/Coldairarrow/EFCore.Sharding/issues/52"/>
    /// </summary>
    public class ShardingStorage : IStorage
    {
        private readonly AppSettings _appSettings;
        private readonly ILogger _logger;
        private readonly IServiceScope scope;

        public ShardingStorage(ILogger<ShardingStorage> logger, IServiceScopeFactory scopeFactor
           , IOptions<AppSettings> options
            )
        {
            _appSettings = options.Value;
            _logger = logger;
            scope = scopeFactor.CreateScope();
        }

        public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId)
        {
            using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
            {
麦壳饼's avatar
麦壳饼 已提交
40 41 42
                var devid = from t in _context.TelemetryLatest
                            where t.DeviceId == deviceId
                            select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
43 44 45 46 47 48 49 50 51

                return devid.ToListAsync();
            }
        }

        public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys)
        {
            using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
            {
麦壳饼's avatar
麦壳饼 已提交
52 53 54 55
                var devid = from t in _context.TelemetryLatest
                            where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName)

                            select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136

                return devid.ToListAsync();
            }
        }

        public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin)
        {
            return LoadTelemetryAsync(deviceId, keys, begin, DateTime.Now);
        }

        public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end)
        {
            return Task.Run(() =>
            {
                using (var _context = scope.ServiceProvider.GetRequiredService<IShardingDbAccessor>())
                {
                    var lst = new List<TelemetryDataDto>();
                    var kv = _context.GetIShardingQueryable<TelemetryData>()
                        .Where(t => t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end)
                        .ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
                    return kv.ToList();
                }
            });
        }

        public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin)
        {
            return LoadTelemetryAsync(deviceId, begin, DateTime.Now);
        }

        public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end)
        {
            return Task.Run(() =>
            {
                using (var _context = scope.ServiceProvider.GetRequiredService<IShardingDbAccessor>())
                {
                    var lst = new List<TelemetryDataDto>();
                    var kv = _context.GetIShardingQueryable<TelemetryData>()
                        .Where(t => t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end)
                        .ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
                    return kv.ToList();
                }
            });
        }

        public async Task<bool> StoreTelemetryAsync(RawMsg msg)
        {
            bool result = false;
            try
            {
                using (var db = scope.ServiceProvider.GetService<IShardingDbAccessor>())
                {
                    var lst = new List<TelemetryData>();
                    msg.MsgBody.ToList().ForEach(kp =>
                                     {
                                         var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
                                         tdata.FillKVToMe(kp);
                                         lst.Add(tdata);
                                     });
                    int ret = db.Insert(lst);
                    _logger.LogInformation($"新增({msg.DeviceId})遥测数据{ret}");
                }
                using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
                {
                    var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
                    result1.exceptions?.ToList().ForEach(ex =>
                    {
                        _logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
                    });
                    _logger.LogInformation($"新增({msg.DeviceId})遥测数据更新最新信息{result1.ret}");
                    result = true;
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
            }
            return result;
        }
    }
}