提交 8f1ce33f 编写于 作者: cdy816's avatar cdy816

修复历史文件读取,网络实时数据接口速度优化

上级 6ac80827
......@@ -268,7 +268,7 @@ namespace Cdy.Tag
public void ReAlloc(long size)
{
Init(size);
GC.Collect();
//GC.Collect();
}
......@@ -1232,7 +1232,7 @@ namespace Cdy.Tag
{
Marshal.FreeHGlobal(mHandles);
LoggerService.Service.Erro("MarshalFixedMemoryBlock", Name +" Disposed " );
GC.Collect();
//GC.Collect();
}
......
......@@ -1840,14 +1840,17 @@ namespace Cdy.Tag
public virtual void Dispose()
{
//mDataBuffer = null;
foreach(var vv in mHandles)
if (mHandles.Count > 0)
{
Marshal.FreeHGlobal(vv);
foreach (var vv in mHandles)
{
Marshal.FreeHGlobal(vv);
}
mHandles.Clear();
}
mHandles.Clear();
LoggerService.Service.Erro("MarshalMemoryBlock", Name +" Disposed " );
// LoggerService.Service.Erro("MarshalMemoryBlock", Name +" Disposed " );
GC.Collect();
//GC.Collect();
}
///// <summary>
......
......@@ -23,7 +23,7 @@ namespace Cdy.Tag
/// <param name="name"></param>
/// <param name="valueChanged"></param>
/// <param name="tagRegistor"></param>
void SubscribeValueChangedForConsumer(string name, ValueChangedNotifyProcesser.ValueChangedDelegate valueChanged,ValueChangedNotifyProcesser.BlockChangedDelegate blockChanged, Func<List<int>> tagRegistor);
void SubscribeValueChangedForConsumer(string name, ValueChangedNotifyProcesser.ValueChangedDelegate valueChanged,ValueChangedNotifyProcesser.BlockChangedDelegate blockChanged,Action BlockChangedNotify, Func<List<int>> tagRegistor);
/// <summary>
/// 取消订购
......
......@@ -131,6 +131,12 @@ namespace Cdy.Tag
///
/// </summary>
public BlockChangedDelegate BlockChanged { get; set; }
/// <summary>
///
/// </summary>
public Action BlockChangedNotify { get; set; }
/// <summary>
///
/// </summary>
......@@ -308,6 +314,7 @@ namespace Cdy.Tag
}
}
}
BlockChangedNotify?.Invoke();
}
Thread.Sleep(10);
......
<?xml version="1.0" encoding="utf-8" ?>
<Config>
<ProxyClient Ip="127.0.0.1" Port="14330" LoginUser="Admin" LoginPassword="Admin" WorkMode="1" PollCircle="1000" IsUseStandardHisDataServer="false"/>
<ProxyClient Ip="127.0.0.1" Port="14330" LoginUser="Admin" LoginPassword="Admin" WorkMode="0" PollCircle="1000" IsUseStandardHisDataServer="false"/>
</Config>
\ No newline at end of file
......@@ -17,9 +17,8 @@
}
},
"DbInRunWebApi": {
"commandName": "Executable",
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "weatherforecast",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
......
......@@ -186,10 +186,11 @@ namespace Cdy.Tag
}
else if(e.ChangeType == System.IO.WatcherChangeTypes.Changed)
{
LoggerService.Service.Info("DataFileMananger", "HisDataFile " + e.Name + " is changed & will be processed!", ConsoleColor.Cyan);
var vtmp = new System.IO.FileInfo(e.FullPath);
lock (mLocker)
{
LoggerService.Service.Info("DataFileMananger", "HisDataFile " + e.Name + " is changed & will be processed!", ConsoleColor.Cyan);
var vtmp = new System.IO.FileInfo(e.FullPath);
if (vtmp.Extension == DataFileExtends)
{
var vfile = CheckAndGetDataFile(e.Name);
......
......@@ -25,7 +25,7 @@ namespace Cdy.Tag
private bool mInited = false;
private object mLockObj = new object();
private static object mLockObj = new object();
private DateTime mLastTime;
......@@ -1757,10 +1757,10 @@ namespace Cdy.Tag
{
DeCompressDataBlockAllValue(vv.Key, vv.Value.Item1, vv.Value.Item2, timetick, result);
}
foreach(var vv in data)
foreach (var vv in data)
{
vv.Key.Dispose();
}
data.Clear();
}
......
......@@ -103,6 +103,7 @@ namespace DBRuntime.Proxy
resetEvent.WaitOne();
if (mIsClosed) break;
resetEvent.Reset();
int icount = mCachDatas.Count;
while (mCachDatas.Count > 0)
{
// ProcessSingleBufferData(mCachDatas.Dequeue());
......@@ -142,7 +143,6 @@ namespace DBRuntime.Proxy
/// <param name="block"></param>
private void ProcessBlockBufferData(IByteBuffer block)
{
var realenginer = (ServiceLocator.Locator.Resolve<IRealTagComsumer>() as RealEnginer);
var start = block.ReadInt();
var size = block.ReadInt();
......@@ -150,6 +150,7 @@ namespace DBRuntime.Proxy
Buffer.BlockCopy(block.Array, block.ArrayOffset + block.ReaderIndex, realenginer.Memory, start, size);
block.SetReaderIndex(block.ReaderIndex + size);
block.ReleaseBuffer();
}
/// <summary>
......
......@@ -3480,7 +3480,7 @@ namespace Cdy.Tag
/// <param name="name"></param>
/// <param name="valueChanged"></param>
/// <param name="tagRegistor"></param>
public void SubscribeValueChangedForConsumer(string name, ValueChangedNotifyProcesser.ValueChangedDelegate valueChanged, ValueChangedNotifyProcesser.BlockChangedDelegate blockchanged, Func<List<int>> tagRegistor)
public void SubscribeValueChangedForConsumer(string name, ValueChangedNotifyProcesser.ValueChangedDelegate valueChanged, ValueChangedNotifyProcesser.BlockChangedDelegate blockchanged,Action BlockChangedNotify, Func<List<int>> tagRegistor)
{
var re = ComsumerValueChangedNotifyManager.Manager.GetNotifier(name);
if (tagRegistor != null)
......@@ -3507,6 +3507,7 @@ namespace Cdy.Tag
}
re.ValueChanged = valueChanged;
re.BlockChanged = blockchanged;
re.BlockChangedNotify = BlockChangedNotify;
re.Start();
}
......
......@@ -122,12 +122,14 @@ namespace DBRuntime.Api
{
lock (mChangedBlocks)
mChangedBlocks.Enqueue(bids);
if (!mIsClosed)
resetEvent.Set();
}
}), new Func<List<int>>(() => { return null; }));
}),()=> {
if (!mIsClosed)
{
resetEvent.Set();
}
}, new Func<List<int>>(() => { return null; }));
}
......@@ -207,10 +209,14 @@ namespace DBRuntime.Api
var cc = ServiceLocator.Locator.Resolve<IRealTagComsumer>() as RealEnginer;
if (start >= cc.Memory.Length) return;
var re = BufferManager.Manager.Allocate(ApiFunConst.RealDataRequestFun, size);
Buffer.BlockCopy(cc.Memory, start, re.Array, re.ArrayOffset + re.WriterIndex, size);
re.SetWriterIndex(re.WriterIndex + size);
Parent.AsyncCallback(clientId, re);
var re = BufferManager.Manager.Allocate(ApiFunConst.RealDataRequestFun, 0);
var ob = Unpooled.CompositeBuffer().AddComponents(true, re, Unpooled.WrappedBuffer(cc.Memory, start, size));
//var re = BufferManager.Manager.Allocate(ApiFunConst.RealDataRequestFun, size);
//Buffer.BlockCopy(cc.Memory, start, re.Array, re.ArrayOffset + re.WriterIndex, size);
//re.SetWriterIndex(re.WriterIndex + size);
Parent.AsyncCallback(clientId, ob);
}
......@@ -234,6 +240,28 @@ namespace DBRuntime.Api
return re;
}
private IByteBuffer GetBlockSendBuffer2(BlockItem item)
{
int start = item.StartAddress;
int size = item.EndAddress - start;
var cc = ServiceLocator.Locator.Resolve<IRealTagComsumer>() as RealEnginer;
if (start >= cc.Memory.Length) return null;
//var re = Unpooled.Buffer(10);
//re.WriteByte(ApiFunConst.RealDataPushFun);
var re = BufferManager.Manager.Allocate(ApiFunConst.RealDataPushFun, 9);
re.WriteByte(RealDataBlockPush);
re.WriteInt(start);
re.WriteInt(size);
return Unpooled.CompositeBuffer().AddComponents(true,re, Unpooled.WrappedBuffer(cc.Memory, start, size));
// Buffer.BlockCopy(cc.Memory, start, re.Array, re.ArrayOffset + re.WriterIndex, size);
// re.SetWriterIndex(re.WriterIndex + size);
// return re;
}
/// <summary>
///
/// </summary>
......@@ -802,15 +830,15 @@ namespace DBRuntime.Api
if(mBlockCallBackRegistorIds.Count>0)
{
Stopwatch sw = new Stopwatch();
sw.Start();
//Stopwatch sw = new Stopwatch();
//sw.Start();
int count = 0;
while (mChangedBlocks.Count>0)
{
var vv = mChangedBlocks.Dequeue();
if (vv == null) continue;
var buffer = GetBlockSendBuffer(vv);
var buffer = GetBlockSendBuffer2(vv);
foreach (var vvb in mBlockCallBackRegistorIds.ToArray())
{
// buffer.Retain();
......@@ -822,8 +850,8 @@ namespace DBRuntime.Api
//buffer.ReleaseBuffer();
count++;
}
sw.Stop();
LoggerService.Service.Erro("RealDataServerProcess", "推送数据耗时"+sw.ElapsedMilliseconds+" 大小:"+ count);
//sw.Stop();
//LoggerService.Service.Erro("RealDataServerProcess", "推送数据耗时"+sw.ElapsedMilliseconds+" 大小:"+ count);
}
else
{
......
......@@ -115,7 +115,7 @@ namespace Cdy.Tag
mChangedTags[vv] = true;
}
}
}),null, new Func<List<int>>(() => { return mTags.Keys.ToList(); }));
}),null,null, new Func<List<int>>(() => { return mTags.Keys.ToList(); }));
mRecordThread = new Thread(ThreadProcess);
mRecordThread.IsBackground=true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册