提交 a04fbf2a 编写于 作者: cdy816's avatar cdy816

增加历史记录由外部直接提交,未完成

上级 873b3da0
......@@ -24,7 +24,11 @@ namespace Cdy.Tag
/// <summary>
/// 值改变
/// </summary>
ValueChanged
ValueChanged,
/// <summary>
/// 手动
/// </summary>
Manual
}
......
......@@ -28,10 +28,14 @@ namespace Cdy.Tag
/// </summary>
private ManualResetEvent resetEvent;
private ManualResetEvent mManualEvent;
private ManualResetEvent closedEvent;
private Thread mCompressThread;
private Thread mManualCompressThread;
private bool mIsClosed = false;
private HisDataMemoryBlockCollection mSourceMemory;
......@@ -154,9 +158,16 @@ namespace Cdy.Tag
//Init();
resetEvent = new ManualResetEvent(false);
closedEvent = new ManualResetEvent(false);
mManualEvent = new ManualResetEvent(false);
mCompressThread = new Thread(ThreadPro);
mCompressThread.IsBackground = true;
mCompressThread.Start();
mManualCompressThread = new Thread(ManualThreadPro);
mManualCompressThread.IsBackground = true;
mManualCompressThread.Start();
}
/// <summary>
......@@ -168,6 +179,7 @@ namespace Cdy.Tag
mIsClosed = true;
resetEvent.Set();
mManualEvent.Set();
closedEvent.WaitOne();
resetEvent.Dispose();
......@@ -178,6 +190,9 @@ namespace Cdy.Tag
while (vv.Value.IsBusy())
vv.Value.DecRef();
}
while (mCompressThread.IsAlive) Thread.Sleep(1);
while (mManualCompressThread.IsAlive) Thread.Sleep(1);
}
/// <summary>
......@@ -300,6 +315,43 @@ namespace Cdy.Tag
mHisTagService = null;
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
public void RequestManualToCompress(ManualHisDataMemoryBlock data)
{
foreach (var vv in mTargetMemorys)
{
if (data.Id >= vv.Value.Id * TagCountOneFile && data.Id < (vv.Value.Id + 1) * TagCountOneFile)
{
vv.Value.AddRequestManualToCompress(data);
}
}
mManualEvent.Set();
}
/// <summary>
///
/// </summary>
private void ManualThreadPro()
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
while (!mIsClosed)
{
mManualEvent.WaitOne();
mManualEvent.Reset();
if (mIsClosed)
break;
foreach (var vv in mTargetMemorys.Values)
{
vv.RequestManualToCompress();
}
}
}
#endregion ...Methods...
#region ... Interfaces ...
......
......@@ -37,6 +37,7 @@ namespace Cdy.Tag
//private bool mIsDisposed=false;
private bool mIsRunning=false;
private Queue<ManualHisDataMemoryBlock> mMemoryCach = new Queue<ManualHisDataMemoryBlock>();
#endregion ...Variables...
......@@ -132,16 +133,43 @@ namespace Cdy.Tag
#region ... Methods ...
///// <summary>
/////
///// </summary>
//public void ResetTagAddress()
//{
// mTagAddress.Clear();
// mTagAddress = null;
// mCompressCach.Clear();
//}
/// <summary>
///
/// </summary>
public void RequestManualToCompress()
{
while (mMemoryCach.Count > 0)
{
RequestManualToCompress(mMemoryCach.Dequeue());
}
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
public void AddRequestManualToCompress(ManualHisDataMemoryBlock data)
{
mMemoryCach.Enqueue(data);
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
private void RequestManualToCompress(ManualHisDataMemoryBlock data)
{
int datasize = 0;
var cdata = CompressMemory(data, out datasize);
cdata.MakeMemoryBusy();
ServiceLocator.Locator.Resolve<IDataSerialize2>().ManualRequestToSeriseFile(data.Id, data.Time, cdata, datasize);
data.MakeMemoryNoBusy();
}
/// <summary>
///
/// </summary>
/// <param name="sourceM"></param>
public void Init(HisDataMemoryBlockCollection sourceM)
{
mTagIds.Clear();
......@@ -217,19 +245,27 @@ namespace Cdy.Tag
foreach (var vv in mTagIds)
{
var val = source.TagAddress[vv];
var size = CompressBlockMemory(val, Offset, val.QualityAddress, val.Length, vv);
if(dtmp.ContainsKey(vv))
dtmp[vv] = Offset;
Offset += size;
datasize += size;
if (val != null)
{
var size = CompressBlockMemory(val, Offset, val.QualityAddress, val.Length, vv);
if (dtmp.ContainsKey(vv))
dtmp[vv] = Offset;
Offset += size;
datasize += size;
}
else
{
dtmp[vv] = 0;
}
}
//更新指针区域
this.WriteInt(0, (int)datasize);
this.Write((int)mTagIds.Count);
this.WriteInt(0, (int)datasize);//写入整体数据大小
this.Write((int)mTagIds.Count); //写入变量个数
long ltmp2 = sw.ElapsedMilliseconds;
//写入变量、数据区对应的索引
int count = 0;
foreach (var vv in dtmp)
{
......@@ -298,6 +334,46 @@ namespace Cdy.Tag
return 0;
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
private MarshalMemoryBlock CompressMemory(ManualHisDataMemoryBlock data,out int datasize)
{
MarshalMemoryBlock block = new MarshalMemoryBlock(data.Length * 2);
var histag = mHisTagService.GetHisTag(data.Id);
if (histag == null)
{
datasize = 0;
return null;
}
var qulityoffset = data.QualityAddress;
var comtype = histag.CompressType;//压缩类型
block.WriteByte(4, (byte)comtype);
var tp = mCompressCach[comtype];
if (tp != null)
{
tp.QulityOffset = (int)qulityoffset;
tp.TagType = histag.TagType;
tp.RecordType = histag.Type;
tp.StartTime = data.Time;
tp.Parameters = histag.Parameters;
tp.Precision = histag.Precision;
var size = tp.Compress(data, 0, block, 5, data.Length) + 1;
block.WriteInt(0, (int)size);
datasize = (int)(size + 5);
}
else
{
datasize = 0;
}
return block;
}
/// <summary>
///
/// </summary>
......
......@@ -318,7 +318,7 @@ namespace Cdy.Tag
mRecordTimerProcesser.Add(mLastProcesser);
}
}
else
else if(mHisTag.Type == RecordType.ValueChanged)
{
if(!mLastValueChangedProcesser.AddTag(mHisTag))
{
......@@ -498,7 +498,7 @@ namespace Cdy.Tag
mRecordTimerProcesser.Add(mLastProcesser);
}
}
else
else if(vv.Value.Type == RecordType.ValueChanged)
{
if (!mLastValueChangedProcesser.AddTag(vv.Value))
{
......@@ -588,6 +588,15 @@ namespace Cdy.Tag
}
}
/// <summary>
///
/// </summary>
/// <param name="tagType"></param>
/// <param name="recordType"></param>
/// <param name="headSize"></param>
/// <param name="dataOffset"></param>
/// <param name="qulityOffset"></param>
/// <returns></returns>
private int CalCachDatablockSize(Cdy.Tag.TagType tagType, Cdy.Tag.RecordType recordType, int headSize, out int dataOffset, out int qulityOffset)
{
//单个数据块内容包括:时间戳(2)+数值+质量戳(1)
......@@ -669,28 +678,39 @@ namespace Cdy.Tag
foreach (var vv in mHisTags)
{
var ss = CalMergeBlockSize(vv.Value.TagType,vv.Value.Type,blockheadsize,out valueOffset, out qulityOffset);
if (vv.Value.Type != RecordType.Manual)
{
var ss = CalMergeBlockSize(vv.Value.TagType, vv.Value.Type, blockheadsize, out valueOffset, out qulityOffset);
var abuffer = new HisDataMemoryBlock(ss) { TimerAddress=0, ValueAddress = valueOffset, QualityAddress = qulityOffset,Id=1 };
var bbuffer = new HisDataMemoryBlock(ss) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset,Id=2 };
mMergeMemory1.AddTagAddress(vv.Value.Id, abuffer);
mMergeMemory2.AddTagAddress(vv.Value.Id, bbuffer);
var abuffer = new HisDataMemoryBlock(ss) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset, Id = 1 };
var bbuffer = new HisDataMemoryBlock(ss) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset, Id = 2 };
mMergeMemory1.AddTagAddress(vv.Value.Id, abuffer);
mMergeMemory2.AddTagAddress(vv.Value.Id, bbuffer);
storeHeadSize += ss;
storeHeadSize += ss;
var css = CalCachDatablockSize(vv.Value.TagType, vv.Value.Type, blockheadsize, out valueOffset,out qulityOffset);
var css = CalCachDatablockSize(vv.Value.TagType, vv.Value.Type, blockheadsize, out valueOffset, out qulityOffset);
var cbuffer = new HisDataMemoryBlock(css) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset,Id=1 };
var dbuffer = new HisDataMemoryBlock(css) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset,Id=2 };
var cbuffer = new HisDataMemoryBlock(css) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset, Id = 1 };
var dbuffer = new HisDataMemoryBlock(css) { TimerAddress = 0, ValueAddress = valueOffset, QualityAddress = qulityOffset, Id = 2 };
vv.Value.HisValueMemory1 = cbuffer;
vv.Value.HisValueMemory2 = dbuffer;
mCachMemory1.AddTagAddress(vv.Value.Id, cbuffer);
mCachMemory2.AddTagAddress(vv.Value.Id, dbuffer);
vv.Value.HisValueMemory1 = cbuffer;
vv.Value.HisValueMemory2 = dbuffer;
mCachMemory1.AddTagAddress(vv.Value.Id, cbuffer);
mCachMemory2.AddTagAddress(vv.Value.Id, dbuffer);
vv.Value.DataSize = css;
vv.Value.DataSize = css;
cachHeadSize += css;
cachHeadSize += css;
}
else
{
mMergeMemory1.AddTagAddress(vv.Value.Id, null);
mMergeMemory2.AddTagAddress(vv.Value.Id, null);
mCachMemory1.AddTagAddress(vv.Value.Id, null);
mCachMemory2.AddTagAddress(vv.Value.Id, null);
}
}
......@@ -900,24 +920,28 @@ namespace Cdy.Tag
foreach (var tag in mHisTags)
{
var taddrs = mCurrentMergeMemory.TagAddress[tag.Value.Id];
var saddrs = mcc.TagAddress[tag.Value.Id];
var saddrs = mcc.TagAddress[tag.Value.Id];
//
if (taddrs == null || saddrs == null) continue;
//拷贝时间
var dlen = saddrs.ValueAddress;
var vtimeaddr = dlen * count + 2;
var vtimeaddr = dlen * count + 2;
saddrs.CopyTo(taddrs, 0, vtimeaddr, dlen);
//拷贝数值
dlen = saddrs.QualityAddress - saddrs.ValueAddress;
vtimeaddr = taddrs.ValueAddress + dlen * count+tag.Value.SizeOfValue;
vtimeaddr = taddrs.ValueAddress + dlen * count + tag.Value.SizeOfValue;
saddrs.CopyTo(taddrs, saddrs.ValueAddress, vtimeaddr, dlen);
//拷贝质量戳
dlen = tag.Value.DataSize -saddrs.QualityAddress;
dlen = tag.Value.DataSize - saddrs.QualityAddress;
vtimeaddr = taddrs.QualityAddress + dlen * count+1;
vtimeaddr = taddrs.QualityAddress + dlen * count + 1;
saddrs.CopyTo(taddrs, saddrs.QualityAddress, vtimeaddr, dlen);
}
//});
......@@ -1237,6 +1261,18 @@ namespace Cdy.Tag
mMergeMemory2?.Dispose();
}
/// <summary>
/// 手动插入历史数据
/// </summary>
/// <param name="id"></param>
/// <param name="values"></param>
/// <returns></returns>
public bool InsertHisValues(long id, SortedDictionary<DateTime, object> values)
{
return true;
}
#endregion ...Methods...
......
//==============================================================
// Copyright (C) 2020 Inc. All rights reserved.
//
//==============================================================
// Create by 种道洋 at 2020/9/21 15:04:04.
// Version 1.0
// 种道洋
//==============================================================
using System;
using System.Collections.Generic;
using System.Text;
namespace DBRuntime.His
{
/// <summary>
///
/// </summary>
public class ManualHisDataMemoryBlock: HisDataMemoryBlock
{
#region ... Variables ...
#endregion ...Variables...
#region ... Events ...
#endregion ...Events...
#region ... Constructor...
/// <summary>
///
/// </summary>
/// <param name="size"></param>
public ManualHisDataMemoryBlock(int size):base(size)
{
}
#endregion ...Constructor...
#region ... Properties ...
/// <summary>
///
/// </summary>
public DateTime Time { get; set; }
#endregion ...Properties...
#region ... Methods ...
#endregion ...Methods...
#region ... Interfaces ...
#endregion ...Interfaces...
}
}
......@@ -43,6 +43,12 @@ namespace Cdy.Tag
/// <param name="dataMemory"></param>
void RequestToCompress(HisDataMemoryBlockCollection dataMemory);
/// <summary>
/// 请求手动压缩数据
/// </summary>
/// <param name="data"></param>
void RequestManualToCompress(ManualHisDataMemoryBlock data);
#endregion ...Methods...
......
......@@ -42,6 +42,15 @@ namespace Cdy.Tag
/// <param name="date"></param>
void RequestToSeriseFile(CompressMemory2 dataMemory,DateTime date);
/// <summary>
/// 手动更新历史数据
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <param name="data"></param>
/// <param name="size"></param>
void ManualRequestToSeriseFile(int id, DateTime time, MarshalMemoryBlock data, int size);
/// <summary>
///
/// </summary>
......
......@@ -40,5 +40,13 @@ namespace Cdy.Tag
/// </summary>
/// <returns></returns>
public List<HisRunTag> ListAllTags();
/// <summary>
/// 手动插入历史数据
/// </summary>
/// <param name="id"></param>
/// <param name="values"></param>
/// <returns></returns>
public bool InsertHisValues(long id,SortedDictionary<DateTime, object> values);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册