提交 82d758cf 编写于 作者: 麦壳饼's avatar 麦壳饼

Merge branch 'xuejmnet-master'

......@@ -15,7 +15,6 @@
<PackageReference Include="AspNetCore.HealthChecks.UI.MySql.Storage" Version="6.0.5" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.9" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.2" />
<PackageReference Include="EFCore.Sharding.MySql" Version="6.0.7" />
</ItemGroup>
<ItemGroup>
......

using EFCore.Sharding;
using IoTSharp;
using IoTSharp.Contracts;
using IoTSharp.Data;
......@@ -8,6 +7,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Diagnostics;
using ShardingCore.Core.ShardingConfigurations;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -38,10 +38,16 @@ namespace Microsoft.Extensions.DependencyInjection
healthChecksUI.AddMySqlStorage(connectionString);
}
public static void UseMySqlToSharding(this IShardingBuilder builder, string connectionString, ShardingByDateMode expandBy)
public static void UseMySqlToSharding(this ShardingConfigOptions options)
{
builder.AddDataSource(connectionString, ReadWriteType.Read | ReadWriteType.Write, DatabaseType.MySql);
builder.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime), (ExpandByDateMode)(int)expandBy, DateTime.Now);
options.UseShardingQuery((conStr, builder) =>
{
builder.UseMySql(conStr, new MySqlServerVersion(new Version()));
});
options.UseShardingTransaction((conn, builder) =>
{
builder.UseMySql(conn, new MySqlServerVersion(new Version()));
});
}
}
}
......@@ -12,7 +12,6 @@
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Oracle" Version="6.0.3" />
<PackageReference Include="AspNetCore.HealthChecks.UI.InMemory.Storage" Version="6.0.5" />
<PackageReference Include="EFCore.Sharding.Oracle" Version="6.0.7" />
<PackageReference Include="Oracle.EntityFrameworkCore" Version="6.21.61" />
</ItemGroup>
......

using EFCore.Sharding;
using IoTSharp;
using IoTSharp.Contracts;
using IoTSharp.Data;
......@@ -7,6 +6,7 @@ using IoTSharp.Data.Oracle;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using ShardingCore.Core.ShardingConfigurations;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -29,10 +29,16 @@ namespace Microsoft.Extensions.DependencyInjection
}
public static void UseOracleToSharding(this IShardingBuilder builder, string connectionString, ShardingByDateMode expandBy)
public static void UseOracleToSharding(this ShardingConfigOptions options)
{
builder.AddDataSource(connectionString, ReadWriteType.Read | ReadWriteType.Write, DatabaseType.Oracle);
builder.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime), (ExpandByDateMode)(int)expandBy, DateTime.Now);
options.UseShardingQuery((conStr, builder) =>
{
builder.UseOracle(conStr);
});
options.UseShardingTransaction((conn, builder) =>
{
builder.UseOracle(conn);
});
}
}
}
......@@ -12,7 +12,6 @@
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.UI.PostgreSQL.Storage" Version="6.0.4" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="6.0.7" />
</ItemGroup>
<ItemGroup>
......

using EFCore.Sharding;
using IoTSharp;
using IoTSharp.Contracts;
using IoTSharp.Data;
......@@ -7,6 +6,7 @@ using IoTSharp.Data.PostgreSQL;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using ShardingCore.Core.ShardingConfigurations;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -29,10 +29,16 @@ namespace Microsoft.Extensions.DependencyInjection
}
public static void UseNpgsqlToSharding(this IShardingBuilder builder, string connectionString, ShardingByDateMode expandBy)
public static void UseNpgsqlToSharding(this ShardingConfigOptions options)
{
builder.AddDataSource(connectionString, ReadWriteType.Read | ReadWriteType.Write, DatabaseType.PostgreSql);
builder.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime),(ExpandByDateMode)(int)expandBy, DateTime.Now);
options.UseShardingQuery((conStr, builder) =>
{
builder.UseNpgsql(conStr);
});
options.UseShardingTransaction((conn, builder) =>
{
builder.UseNpgsql(conn);
});
}
}
}
......@@ -12,7 +12,6 @@
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.UI.SqlServer.Storage" Version="6.0.5" />
<PackageReference Include="EFCore.Sharding.SqlServer" Version="6.0.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.9" />
</ItemGroup>
......

using EFCore.Sharding;
using IoTSharp;
using IoTSharp.Contracts;
using IoTSharp.Data;
......@@ -7,6 +6,7 @@ using IoTSharp.Data.SqlServer;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using ShardingCore.Core.ShardingConfigurations;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -29,10 +29,16 @@ namespace Microsoft.Extensions.DependencyInjection
}
public static void UseSqlServerToSharding(this IShardingBuilder builder, string connectionString, ShardingByDateMode expandBy)
public static void UseSqlServerToSharding(this ShardingConfigOptions options)
{
builder.AddDataSource(connectionString, ReadWriteType.Read | ReadWriteType.Write, DatabaseType.SqlServer);
builder.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime), (ExpandByDateMode)(int)expandBy, DateTime.Now);
options.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlServer(conStr);
});
options.UseShardingTransaction((conn, builder) =>
{
builder.UseSqlServer(conn);
});
}
}
}
......@@ -13,7 +13,6 @@
<PackageReference Include="AspNetCore.HealthChecks.UI.Sqlite.Storage" Version="6.0.5" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Core" Version="6.0.5" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.9" />
<PackageReference Include="EFCore.Sharding.SQLite" Version="6.0.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\IoTSharp.Data\IoTSharp.Data.csproj" />
......

using EFCore.Sharding;
using IoTSharp;
using IoTSharp.Contracts;
using IoTSharp.Data;
......@@ -8,6 +7,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Linq;
using ShardingCore.Core.ShardingConfigurations;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -30,10 +30,16 @@ namespace Microsoft.Extensions.DependencyInjection
}
public static void UseSQLiteToSharding(this IShardingBuilder builder, string connectionString, ShardingByDateMode expandBy)
public static void UseSQLiteToSharding(this ShardingConfigOptions options)
{
builder.AddDataSource(connectionString, ReadWriteType.Read | ReadWriteType.Write, DatabaseType.SQLite);
builder.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime), (ExpandByDateMode)(int)expandBy, DateTime.Now);
options.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlite(conStr);
});
options.UseShardingTransaction((conn, builder) =>
{
builder.UseSqlite(conn);
});
}
public static void SetCaseInsensitiveSearchesForSQLite(this ModelBuilder modelBuilder)
......
......@@ -9,7 +9,7 @@
<PackageReference Include="Apache.IoTDB.Data" Version="0.13.0.8" />
<PackageReference Include="EFCore.Sharding" Version="6.0.7" />
<!-- <PackageReference Include="EFCore.Sharding" Version="6.0.7" />-->
<PackageReference Include="hyjiacan.pinyin4net" Version="4.1.1" />
......
using EFCore.Sharding;
using IoTSharp.Contracts;
using IoTSharp.Contracts;
using IoTSharp.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
......@@ -10,6 +9,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using IoTSharp.Data.Shardings;
namespace IoTSharp.Storage
{
......@@ -92,10 +92,10 @@ namespace IoTSharp.Storage
{
using (var scope = _scopeFactor.CreateScope())
{
using (var context = scope.ServiceProvider.GetService<IShardingDbAccessor>())
using (var context = scope.ServiceProvider.GetRequiredService<ShardingDbContext>())
{
var lst = new List<TelemetryDataDto>();
var kv = context.GetIShardingQueryable<TelemetryData>()
var kv = context.Set<TelemetryData>()
.Where(t => t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
if (!string.IsNullOrEmpty(keys))
......@@ -128,7 +128,7 @@ namespace IoTSharp.Storage
{
using var scope = _scopeFactor.CreateScope();
using (var db = scope.ServiceProvider.GetService<IShardingDbAccessor>())
using (var db = scope.ServiceProvider.GetRequiredService<ShardingDbContext>())
{
var lst = new List<TelemetryData>();
msg.MsgBody.ToList().ForEach(kp =>
......@@ -141,8 +141,8 @@ namespace IoTSharp.Storage
telemetries.Add(tdata);
}
});
int ret = await db.InsertAsync(lst);
_logger.LogInformation($"新增({msg.DeviceId})遥测数据{ret}");
await db.AddAsync(lst);
_logger.LogInformation($"新增({msg.DeviceId})遥测数据1");
}
}
catch (Exception ex)
......
......@@ -8,13 +8,13 @@ using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using IoTSharp.Data.Configurations;
using IoTSharp.Contracts;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.Abstractions;
namespace IoTSharp.Data
{
public class ApplicationDbContext : IdentityDbContext
{
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
: base(options)
{
......@@ -46,9 +46,6 @@ namespace IoTSharp.Data
base.OnModelCreating(modelBuilder);
}
public DbSet<Tenant> Tenant { get; set; }
public DbSet<Customer> Customer { get; set; }
public DbSet<Relationship> Relationship { get; set; }
......
......@@ -23,6 +23,7 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="6.0.9" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.9" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="ShardingCore" Version="6.6.0.36" />
</ItemGroup>
......
// using System;
// using System.Collections.Generic;
// using System.Linq;
// using System.Threading;
// using System.Threading.Tasks;
// using Microsoft.AspNetCore.Identity.EntityFrameworkCore;
// using Microsoft.EntityFrameworkCore;
// using Microsoft.EntityFrameworkCore.ChangeTracking;
// using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
// using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
// using ShardingCore.EFCores.OptionsExtensions;
// using ShardingCore.Sharding;
// using ShardingCore.Sharding.Abstractions;
// using ShardingCore.Sharding.ShardingDbContextExecutors;
//
// namespace IoTSharp.Data
// {
// /// <summary>
// /// 因为直接让ApplicationDbContext支持sharding对程序影响太大
// /// 目前就暂时注释掉这个如果需要可以继承这个抽象
// /// </summary>
// public abstract class AbstractShardingApplicationDbContext:IdentityDbContext,IShardingDbContext, ISupportShardingReadWrite,ICurrentDbContextDiscover
// {
// private readonly IShardingDbContextExecutor _shardingDbContextExecutor;
// protected AbstractShardingApplicationDbContext(DbContextOptions options)
// {
// var wrapOptionsExtension = options.FindExtension<ShardingWrapOptionsExtension>();
// if (wrapOptionsExtension != null)
// {
// _shardingDbContextExecutor = new ShardingDbContextExecutor(this);
// }
// }
//
// /// <summary>
// /// 读写分离优先级
// /// </summary>
// public int ReadWriteSeparationPriority
// {
// get => _shardingDbContextExecutor.ReadWriteSeparationPriority;
// set => _shardingDbContextExecutor.ReadWriteSeparationPriority = value;
// }
// /// <summary>
// /// 是否使用读写分离
// /// </summary>
// public bool ReadWriteSeparation
// {
// get => _shardingDbContextExecutor.ReadWriteSeparation;
// set => _shardingDbContextExecutor.ReadWriteSeparation = value;
// }
//
// /// <summary>
// /// 是否是真正的执行者
// /// </summary>
// private bool isExecutor => _shardingDbContextExecutor == null;
//
// //public void ShardingUpgrade()
// //{
// // //isExecutor = true;
// //}
//
// public DbContext GetDbContext(string dataSourceName, CreateDbContextStrategyEnum strategy, IRouteTail routeTail)
// {
// var dbContext = _shardingDbContextExecutor.CreateDbContext(strategy, dataSourceName, routeTail);
//
// return dbContext;
// }
//
// /// <summary>
// /// 根据对象创建通用的dbcontext
// /// </summary>
// /// <typeparam name="TEntity"></typeparam>
// /// <param name="entity"></param>
// /// <returns></returns>
// public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
// {
// var dbContext = _shardingDbContextExecutor.CreateGenericDbContext(entity);
//
// return dbContext;
// }
//
// public IVirtualDataSource GetVirtualDataSource()
// {
// return _shardingDbContextExecutor.GetVirtualDataSource();
// }
//
// public override EntityEntry Add(object entity)
// {
// if (isExecutor)
// base.Add(entity);
// return CreateGenericDbContext(entity).Add(entity);
// }
//
// public override EntityEntry<TEntity> Add<TEntity>(TEntity entity)
// {
// if (isExecutor)
// return base.Add(entity);
// return CreateGenericDbContext(entity).Add(entity);
// }
//
//
// public override ValueTask<EntityEntry<TEntity>> AddAsync<TEntity>(TEntity entity, CancellationToken cancellationToken = new CancellationToken())
// {
// if (isExecutor)
// return base.AddAsync(entity, cancellationToken);
// return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
// }
//
// public override ValueTask<EntityEntry> AddAsync(object entity, CancellationToken cancellationToken = new CancellationToken())
// {
// if (isExecutor)
// return base.AddAsync(entity, cancellationToken);
// return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
// }
//
// public override void AddRange(params object[] entities)
// {
// if (isExecutor)
// {
// base.AddRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.AddRange(group.Select(o => o.Entity));
// }
// }
//
// public override void AddRange(IEnumerable<object> entities)
// {
// if (isExecutor)
// {
// base.AddRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.AddRange(group.Select(o => o.Entity));
// }
// }
//
// public override async Task AddRangeAsync(params object[] entities)
// {
// if (isExecutor)
// {
// await base.AddRangeAsync(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// await group.Key.AddRangeAsync(group.Select(o => o.Entity));
// }
// }
//
// public override async Task AddRangeAsync(IEnumerable<object> entities, CancellationToken cancellationToken = new CancellationToken())
// {
// if (isExecutor)
// {
// await base.AddRangeAsync(entities, cancellationToken);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// await group.Key.AddRangeAsync(group.Select(o => o.Entity));
// }
// }
//
// public override EntityEntry<TEntity> Attach<TEntity>(TEntity entity)
// {
// if (isExecutor)
// return base.Attach(entity);
// return CreateGenericDbContext(entity).Attach(entity);
// }
//
// public override EntityEntry Attach(object entity)
// {
// if (isExecutor)
// return base.Attach(entity);
// return CreateGenericDbContext(entity).Attach(entity);
// }
//
// public override void AttachRange(params object[] entities)
// {
// if (isExecutor)
// {
// base.AttachRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.AttachRange(group.Select(o => o.Entity));
// }
// }
//
// public override void AttachRange(IEnumerable<object> entities)
// {
// if (isExecutor)
// {
// base.AttachRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.AttachRange(group.Select(o => o.Entity));
// }
// }
//
//
// //public override DatabaseFacade Database => _dbContextCaches.Any()
// // ? _dbContextCaches.First().Value.Database
// // : GetDbContext(true, string.Empty).Database;
//
// public override EntityEntry<TEntity> Entry<TEntity>(TEntity entity)
// {
// if (isExecutor)
// return base.Entry(entity);
// return CreateGenericDbContext(entity).Entry(entity);
// }
//
// public override EntityEntry Entry(object entity)
// {
// if (isExecutor)
// return base.Entry(entity);
// return CreateGenericDbContext(entity).Entry(entity);
// }
//
// public override EntityEntry<TEntity> Update<TEntity>(TEntity entity)
// {
// if (isExecutor)
// return base.Update(entity);
// return CreateGenericDbContext(entity).Update(entity);
// }
//
// public override EntityEntry Update(object entity)
// {
// if (isExecutor)
// return base.Update(entity);
// return CreateGenericDbContext(entity).Update(entity);
// }
//
// public override void UpdateRange(params object[] entities)
// {
// if (isExecutor)
// {
// base.UpdateRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.UpdateRange(group.Select(o => o.Entity));
// }
// }
//
// public override void UpdateRange(IEnumerable<object> entities)
// {
// if (isExecutor)
// {
// base.UpdateRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.UpdateRange(group.Select(o => o.Entity));
// }
// }
//
// public override EntityEntry<TEntity> Remove<TEntity>(TEntity entity)
// {
// if (isExecutor)
// return base.Remove(entity);
// return CreateGenericDbContext(entity).Remove(entity);
// }
//
// public override EntityEntry Remove(object entity)
// {
// if (isExecutor)
// return base.Remove(entity);
// return CreateGenericDbContext(entity).Remove(entity);
// }
//
// public override void RemoveRange(params object[] entities)
// {
// if (isExecutor)
// {
// base.RemoveRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.RemoveRange(group.Select(o => o.Entity));
// }
// }
//
// public override void RemoveRange(IEnumerable<object> entities)
// {
// if (isExecutor)
// {
// base.RemoveRange(entities);
// return;
// }
// var groups = entities.Select(o =>
// {
// var dbContext = CreateGenericDbContext(o);
// return new
// {
// DbContext = dbContext,
// Entity = o
// };
// }).GroupBy(g => g.DbContext);
//
// foreach (var group in groups)
// {
// group.Key.RemoveRange(group.Select(o => o.Entity));
// }
// }
//
// public override int SaveChanges()
// {
//
// if (isExecutor)
// return base.SaveChanges();
// return this.SaveChanges(true);
// }
//
// public override int SaveChanges(bool acceptAllChangesOnSuccess)
// {
// if (isExecutor)
// return base.SaveChanges(acceptAllChangesOnSuccess);
// //ApplyShardingConcepts();
// int i = 0;
// //如果是内部开的事务就内部自己消化
// if (Database.CurrentTransaction == null && _shardingDbContextExecutor.IsMultiDbContext)
// {
// using (var tran = Database.BeginTransaction())
// {
// i = _shardingDbContextExecutor.SaveChanges(acceptAllChangesOnSuccess);
// tran.Commit();
// }
// }
// else
// {
// i = _shardingDbContextExecutor.SaveChanges(acceptAllChangesOnSuccess);
// }
//
// return i;
// }
//
//
// public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// if (isExecutor)
// return base.SaveChangesAsync(cancellationToken);
// return this.SaveChangesAsync(true, cancellationToken);
// }
//
// public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
// {
// if (isExecutor)
// return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
// //ApplyShardingConcepts();
// int i = 0;
// //如果是内部开的事务就内部自己消化
// if (Database.CurrentTransaction == null && _shardingDbContextExecutor.IsMultiDbContext)
// {
// using (var tran = await Database.BeginTransactionAsync((CancellationToken)cancellationToken))
// {
// i = await _shardingDbContextExecutor.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
//
// await tran.CommitAsync(cancellationToken);
// }
// }
// else
// {
// i = await _shardingDbContextExecutor.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
// }
//
//
// return i;
// }
//
// public override void Dispose()
// {
//
// if (isExecutor)
// {
// base.Dispose();
// }
// else
// {
// _shardingDbContextExecutor.Dispose();
// base.Dispose();
// }
// }
//
// public override async ValueTask DisposeAsync()
// {
// if (isExecutor)
// {
// await base.DisposeAsync();
// }
// else
// {
// await _shardingDbContextExecutor.DisposeAsync();
//
// await base.DisposeAsync();
// }
// }
// public Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// return _shardingDbContextExecutor.RollbackAsync(cancellationToken);
// }
//
// public Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// return _shardingDbContextExecutor.CommitAsync(cancellationToken);
// }
//
// public void NotifyShardingTransaction()
// {
// _shardingDbContextExecutor.NotifyShardingTransaction();
// }
//
// public void Rollback()
// {
// _shardingDbContextExecutor.Rollback();
// }
//
// public void Commit()
// {
// _shardingDbContextExecutor.Commit();
// }
//
// public IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts()
// {
// return _shardingDbContextExecutor.GetCurrentDbContexts();
// }
// }
// }
\ No newline at end of file
using System;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.VirtualRoutes.Days;
namespace IoTSharp.Data.Shardings.Routes
{
public class TelemetryDataDayRoute:AbstractSimpleShardingDayKeyDateTimeVirtualTableRoute<TelemetryData>
{
public override void Configure(EntityMetadataTableBuilder<TelemetryData> builder)
{
builder.ShardingProperty(o => o.DateTime);
}
public override bool AutoCreateTableByTime()
{
return true;
}
public override DateTime GetBeginTime()
{
//原则上不应该使用datetime.now但是我看你之前的程序是这么写的如果是now那么每次启动都算是开始时间之前的表可能会访问不到
//因为访问只会访问begintime之后的时间
return DateTime.Now;
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.VirtualRoutes.Abstractions;
namespace IoTSharp.Data.Shardings.Routes
{
public class TelemetryDataHourRoute:AbstractShardingTimeKeyDateTimeVirtualTableRoute<TelemetryData>
{
public override void Configure(EntityMetadataTableBuilder<TelemetryData> builder)
{
builder.ShardingProperty(o => o.DateTime);
}
public override Func<string, bool> GetRouteToFilter(DateTime shardingKey, ShardingOperatorEnum shardingOperator)
{
var t = TimeFormatToTail(shardingKey);
switch (shardingOperator)
{
case ShardingOperatorEnum.GreaterThan:
case ShardingOperatorEnum.GreaterThanOrEqual:
return tail => String.Compare(tail, t, StringComparison.Ordinal) >= 0;
case ShardingOperatorEnum.LessThan:
{
//处于临界值 o=>o.time < [2021-01-01 01:00:00] 尾巴2021010101不应该被返回
if (shardingKey.Minute==0&&shardingKey.Second==0)
return tail => String.Compare(tail, t, StringComparison.Ordinal) < 0;
return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
}
case ShardingOperatorEnum.LessThanOrEqual:
return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
case ShardingOperatorEnum.Equal: return tail => tail == t;
default:
{
#if DEBUG
Console.WriteLine($"shardingOperator is not equal scan all table tail");
#endif
return tail => true;
}
}
}
protected override List<string> CalcTailsOnStart()
{
var beginTime = GetBeginTime().Date;
var tails = new List<string>();
//提前创建表
var nowTimeStamp = DateTime.Now.Date;
if (beginTime > nowTimeStamp)
throw new ArgumentException("begin time error");
var currentTimeStamp = beginTime;
while (currentTimeStamp <= nowTimeStamp)
{
var tail = ShardingKeyToTail(currentTimeStamp);
tails.Add(tail);
currentTimeStamp = currentTimeStamp.AddDays(1);
}
return tails;
}
public DateTime GetBeginTime()
{
//原则上不应该使用datetime.now但是我看你之前的程序是这么写的如果是now那么每次启动都算是开始时间之前的表可能会访问不到
//因为访问只会访问begintime之后的时间
return DateTime.Now;
}
public override bool AutoCreateTableByTime()
{
return true;
}
public override string[] GetCronExpressions()
{
return new[]
{
"0 */1 * * * ?",
};
}
protected override string TimeFormatToTail(DateTime time)
{
return $"{time:yyyyMMdd}";
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.VirtualRoutes.Abstractions;
namespace IoTSharp.Data.Shardings.Routes
{
public class TelemetryDataMinuteRoute:AbstractShardingTimeKeyDateTimeVirtualTableRoute<TelemetryData>
{
public override void Configure(EntityMetadataTableBuilder<TelemetryData> builder)
{
builder.ShardingProperty(o => o.DateTime);
}
public override Func<string, bool> GetRouteToFilter(DateTime shardingKey, ShardingOperatorEnum shardingOperator)
{
var t = TimeFormatToTail(shardingKey);
switch (shardingOperator)
{
case ShardingOperatorEnum.GreaterThan:
case ShardingOperatorEnum.GreaterThanOrEqual:
return tail => String.Compare(tail, t, StringComparison.Ordinal) >= 0;
case ShardingOperatorEnum.LessThan:
{
//处于临界值 o=>o.time < [2021-01-01 01:01:00] 尾巴202101010101不应该被返回
if (shardingKey.Second==0)
return tail => String.Compare(tail, t, StringComparison.Ordinal) < 0;
return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
}
case ShardingOperatorEnum.LessThanOrEqual:
return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
case ShardingOperatorEnum.Equal: return tail => tail == t;
default:
{
#if DEBUG
Console.WriteLine($"shardingOperator is not equal scan all table tail");
#endif
return tail => true;
}
}
}
protected override List<string> CalcTailsOnStart()
{
var beginTime = GetBeginTime().Date;
var tails = new List<string>();
//提前创建表
var nowTimeStamp = DateTime.Now.Date;
if (beginTime > nowTimeStamp)
throw new ArgumentException("begin time error");
var currentTimeStamp = beginTime;
while (currentTimeStamp <= nowTimeStamp)
{
var tail = ShardingKeyToTail(currentTimeStamp);
tails.Add(tail);
currentTimeStamp = currentTimeStamp.AddMinutes(1);
}
return tails;
}
public DateTime GetBeginTime()
{
//原则上不应该使用datetime.now但是我看你之前的程序是这么写的如果是now那么每次启动都算是开始时间之前的表可能会访问不到
//因为访问只会访问begintime之后的时间
return DateTime.Now;
}
public override bool AutoCreateTableByTime()
{
return true;
}
public override string[] GetCronExpressions()
{
return new[]
{
"0 */1 * * * ?",
};
}
protected override string TimeFormatToTail(DateTime time)
{
return $"{time:yyyyMMddHHmm}";
}
}
}
\ No newline at end of file
using System;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.VirtualRoutes.Months;
namespace IoTSharp.Data.Shardings.Routes
{
public class TelemetryDataMonthRoute:AbstractSimpleShardingMonthKeyDateTimeVirtualTableRoute<TelemetryData>
{
public override void Configure(EntityMetadataTableBuilder<TelemetryData> builder)
{
builder.ShardingProperty(o => o.DateTime);
}
public override bool AutoCreateTableByTime()
{
return true;
}
public override DateTime GetBeginTime()
{
//原则上不应该使用datetime.now但是我看你之前的程序是这么写的如果是now那么每次启动都算是开始时间之前的表可能会访问不到
//因为访问只会访问begintime之后的时间
return DateTime.Now;
}
}
}
\ No newline at end of file
using System;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.VirtualRoutes.Years;
namespace IoTSharp.Data.Shardings.Routes
{
public class TelemetryDataYearRoute:AbstractSimpleShardingYearKeyDateTimeVirtualTableRoute<TelemetryData>
{
public override void Configure(EntityMetadataTableBuilder<TelemetryData> builder)
{
builder.ShardingProperty(o => o.DateTime);
}
public override bool AutoCreateTableByTime()
{
return true;
}
public override DateTime GetBeginTime()
{
//原则上不应该使用datetime.now但是我看你之前的程序是这么写的如果是now那么每次启动都算是开始时间之前的表可能会访问不到
//因为访问只会访问begintime之后的时间
return DateTime.Now;
}
}
}
\ No newline at end of file
using IoTSharp.Data.Configurations;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
namespace IoTSharp.Data.Shardings
{
public class ShardingDbContext:AbstractShardingDbContext,IShardingTableDbContext
{
public ShardingDbContext(DbContextOptions<ShardingDbContext> options) : base(options)
{
}
public IRouteTail RouteTail { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.ApplyConfiguration(new TelemetryDataConfiguration());
}
}
}
\ No newline at end of file
using DotNetCore.CAP;
using Dynamitey;
using IoTSharp.Contracts;
using IoTSharp.Data;
using System;
......
using Dynamitey;
using IoTSharp.Contracts;
using IoTSharp.Contracts;
using IoTSharp.Data;
using RabbitMQ.Client;
using Shashlik.EventBus;
......
using Dynamitey;
using IoTSharp.Contracts;
using IoTSharp.Contracts;
using IoTSharp.Data;
using System;
using System.Collections;
......
......@@ -2,10 +2,8 @@
using IoTSharp.Controllers.Models;
using IoTSharp.Data;
using IoTSharp.Data.Extensions;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using System;
......@@ -13,6 +11,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
......@@ -9,11 +9,11 @@ using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
......@@ -3,7 +3,6 @@ using IoTSharp.Controllers.Models;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
......@@ -11,6 +10,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
......@@ -12,7 +12,6 @@ using IoTSharp.Gateways;
using IoTSharp.Models;
using IoTSharp.Storage;
using IoTSharp.X509Extensions;
using LinqKit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Identity;
......@@ -36,6 +35,7 @@ using System.Linq.Expressions;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using System.Xml;
using ShardingCore.Extensions;
using Dic = System.Collections.Generic.Dictionary<string, string>;
using DicKV = System.Collections.Generic.KeyValuePair<string, string>;
......
......@@ -3,13 +3,13 @@ using IoTSharp.Controllers.Models;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Linq;
using System.Linq.Dynamic.Core;
using System.Linq.Expressions;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
......@@ -4,7 +4,6 @@ using IoTSharp.Controllers.Models;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
......@@ -18,6 +17,7 @@ using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Web;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
......@@ -9,11 +9,11 @@ using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
......@@ -6,7 +6,6 @@ using IoTSharp.Extensions;
using IoTSharp.FlowRuleEngine;
using IoTSharp.Models;
using IoTSharp.Models.Rule;
using LinqKit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.Mvc;
......@@ -21,6 +20,7 @@ using System.Linq;
using System.Linq.Dynamic.Core;
using System.Linq.Expressions;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
using IoTSharp.Contracts;
using IoTSharp.Controllers.Models;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Models;
using LinqKit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.Mvc;
......@@ -13,6 +11,7 @@ using System;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace IoTSharp.Controllers
{
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using IoTSharp.Contracts;
using IoTSharp.Data;
using IoTSharp.Dtos;
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
......
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Data;
using IoTSharp.Extensions;
......
using IoTSharp.EventBus;
using EasyCaching.Core.Configurations;
using EFCore.Sharding;
using HealthChecks.UI.Client;
using InfluxDB.Client;
using IoTSharp.Controllers.Models;
using IoTSharp.Data;
using IoTSharp.Data.Sqlite;
using IoTSharp.FlowRuleEngine;
using IoTSharp.Interpreter;
using IoTSharp.Storage;
using IoTSharp.X509Extensions;
using Jdenticon.AspNetCore;
using Jdenticon.Rendering;
using IoTSharp.Data.Taos;
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
......@@ -29,25 +24,21 @@ using MQTTnet.AspNetCore;
using Newtonsoft.Json.Serialization;
using PinusDB.Data;
using Quartz;
using RabbitMQ.Client;
using Savorboard.CAP.InMemoryMessageQueue;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using Jdenticon;
using IoTSharp.Gateways;
using System.Collections.Specialized;
using Microsoft.Extensions.ObjectPool;
using MaiKeBing.HostedService.ZeroMQ;
using IoTSharp.TaskActions;
using Dynamitey;
using IoTSharp.Contracts;
using IoTSharp.Data.Shardings;
using IoTSharp.Data.Shardings.Routes;
using IoTSharp.EventBus.CAP;
using IoTSharp.EventBus.Shashlik;
using Storage.Net.Blobs;
using Microsoft.EntityFrameworkCore;
using ShardingCore;
using Storage.Net;
namespace IoTSharp
......@@ -230,34 +221,47 @@ namespace IoTSharp
switch (settings.TelemetryStorage)
{
case TelemetryStorage.Sharding:
services.AddEFCoreSharding(config =>
ShardingByDateMode settingsShardingByDateMode = settings.ShardingByDateMode;
services.AddShardingDbContext<ShardingDbContext>().UseRouteConfig(o =>
{
switch (settingsShardingByDateMode)
{
case ShardingByDateMode.PerMinute : o.AddShardingTableRoute<TelemetryDataMinuteRoute>();break;
case ShardingByDateMode.PerHour : o.AddShardingTableRoute<TelemetryDataHourRoute>();break;
case ShardingByDateMode.PerDay : o.AddShardingTableRoute<TelemetryDataDayRoute>();break;
case ShardingByDateMode.PerMonth : o.AddShardingTableRoute<TelemetryDataMonthRoute>();break;
case ShardingByDateMode.PerYear : o.AddShardingTableRoute<TelemetryDataYearRoute>();break;
default: throw new InvalidOperationException($"unknown sharding mode:{settingsShardingByDateMode}");
}
}).UseConfig(o =>
{
o.ThrowIfQueryRouteNotMatch = false;
o.UseShellDbContextConfigure(builder=>builder.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking));
o.AddDefaultDataSource("ds0",Configuration.GetConnectionString("TelemetryStorage"));
switch (settings.DataBase)
{
case DataBaseType.MySql:
config.UseMySqlToSharding(Configuration.GetConnectionString("TelemetryStorage"), settings.ShardingByDateMode);
o.UseMySqlToSharding();
break;
case DataBaseType.SqlServer:
config.UseSqlServerToSharding(Configuration.GetConnectionString("TelemetryStorage"), settings.ShardingByDateMode);
o.UseSqlServerToSharding();
break;
case DataBaseType.Oracle:
config.UseOracleToSharding(Configuration.GetConnectionString("TelemetryStorage"), settings.ShardingByDateMode);
o.UseOracleToSharding();
break;
case DataBaseType.Sqlite:
config.UseSQLiteToSharding(Configuration.GetConnectionString("TelemetryStorage"), settings.ShardingByDateMode);
o.UseSQLiteToSharding();
break;
case DataBaseType.PostgreSql:
default:
config.UseNpgsqlToSharding(Configuration.GetConnectionString("TelemetryStorage"), settings.ShardingByDateMode);
o.UseNpgsqlToSharding();
break;
}
config.SetEntityAssemblies(new Assembly[] { typeof(TelemetryData).Assembly });
});
}).AddShardingCore();
services.AddSingleton<IStorage, ShardingStorage>();
break;
......@@ -375,6 +379,13 @@ namespace IoTSharp
// The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts.
app.UseHsts();
}
//添加定时任务创建表
var settings = Configuration.Get<AppSettings>();
if (settings.TelemetryStorage == TelemetryStorage.Sharding)
{
app.ApplicationServices.UseAutoShardingCreate();
app.ApplicationServices.UseAutoTryCompensateTable();
}
app.UseRouting();
app.UseCors(option => option
.AllowAnyOrigin()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册