提交 814d25a8 编写于 作者: cdy816's avatar cdy816

Merge branch 'master' of https://github.com/cdy816/mars

...@@ -67,10 +67,13 @@ namespace Cdy.Tag ...@@ -67,10 +67,13 @@ namespace Cdy.Tag
get { return mIsConnected; } get { return mIsConnected; }
set set
{ {
mIsConnected = value; if (mIsConnected != value)
if (PropertyChanged != null)
{ {
Task.Run(() => { PropertyChanged(this, new PropertyChangedEventArgs("IsConnected")); }); mIsConnected = value;
if (PropertyChanged != null)
{
Task.Run(() => { PropertyChanged(this, new PropertyChangedEventArgs("IsConnected")); });
}
} }
} }
} }
......
...@@ -330,15 +330,17 @@ namespace Cdy.Tag ...@@ -330,15 +330,17 @@ namespace Cdy.Tag
internal void Registor(IChannelHandlerContext channel) internal void Registor(IChannelHandlerContext channel)
{ {
string sname = GetClientId(channel); string sname = GetClientId(channel);
if (!mClients.ContainsKey(sname)) lock (mClients)
{ {
mClients[sname] = channel; if (!mClients.ContainsKey(sname))
} {
else mClients[sname] = channel;
{ }
mClients.Add(sname, channel); else
{
mClients.Add(sname, channel);
}
} }
OnClientConnected(sname); OnClientConnected(sname);
} }
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
"profiles": { "profiles": {
"DBInRun": { "DBInRun": {
"commandName": "Executable", "commandName": "Executable",
"executablePath": "C:\\Users\\chongdaoyang\\source\\repos\\mars\\Output\\DBInRun.exe" "executablePath": "C:\\Users\\cdy81\\source\\repos\\mars\\Output\\DBInRun.exe"
} }
} }
} }
\ No newline at end of file
...@@ -92,7 +92,8 @@ namespace Cdy.Tag ...@@ -92,7 +92,8 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Start() public void Start()
{ {
foreach(var vv in mDrivers.Values) LoggerService.Service.Info("DriverManager", "start to Start");
foreach (var vv in mDrivers.Values)
{ {
vv.Start(mTagDriverService); vv.Start(mTagDriverService);
} }
...@@ -103,6 +104,7 @@ namespace Cdy.Tag ...@@ -103,6 +104,7 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Stop() public void Stop()
{ {
LoggerService.Service.Info("DriverManager", "start to stop");
foreach (var vv in mDrivers.Values) foreach (var vv in mDrivers.Values)
{ {
vv.Stop(); vv.Stop();
......
...@@ -149,6 +149,7 @@ namespace Cdy.Tag ...@@ -149,6 +149,7 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Start() public void Start()
{ {
LoggerService.Service.Info("CompressEnginer", "start to Start");
mIsClosed = false; mIsClosed = false;
//Init(); //Init();
resetEvent = new ManualResetEvent(false); resetEvent = new ManualResetEvent(false);
...@@ -163,6 +164,8 @@ namespace Cdy.Tag ...@@ -163,6 +164,8 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Stop() public void Stop()
{ {
LoggerService.Service.Info("CompressEnginer", "start to stop");
mIsClosed = true; mIsClosed = true;
resetEvent.Set(); resetEvent.Set();
closedEvent.WaitOne(); closedEvent.WaitOne();
......
...@@ -736,6 +736,7 @@ namespace Cdy.Tag ...@@ -736,6 +736,7 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Start() public void Start()
{ {
LoggerService.Service.Info("HisEnginer", "start to Start");
mIsClosed = false; mIsClosed = false;
mMegerProcessIsClosed = false; mMegerProcessIsClosed = false;
LoggerService.Service.Info("Record", "历史变量个数: " + this.mHisTags.Count); LoggerService.Service.Info("Record", "历史变量个数: " + this.mHisTags.Count);
...@@ -1133,9 +1134,10 @@ namespace Cdy.Tag ...@@ -1133,9 +1134,10 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Stop() public void Stop()
{ {
mRecordTimer.Stop(); LoggerService.Service.Info("HisEnginer", "start to stop");
if(mRecordTimer!=null) if(mRecordTimer!=null)
{ {
mRecordTimer.Stop();
mRecordTimer.Elapsed -= MRecordTimer_Elapsed; mRecordTimer.Elapsed -= MRecordTimer_Elapsed;
mRecordTimer.Dispose(); mRecordTimer.Dispose();
mRecordTimer = null; mRecordTimer = null;
......
...@@ -194,6 +194,7 @@ namespace Cdy.Tag ...@@ -194,6 +194,7 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Start() public void Start()
{ {
LoggerService.Service.Info("SeriseEnginer", "start to Start");
mIsClosed = false; mIsClosed = false;
//Init(); //Init();
resetEvent = new ManualResetEvent(false); resetEvent = new ManualResetEvent(false);
...@@ -208,6 +209,7 @@ namespace Cdy.Tag ...@@ -208,6 +209,7 @@ namespace Cdy.Tag
/// </summary> /// </summary>
public void Stop() public void Stop()
{ {
LoggerService.Service.Info("SeriseEnginer", "start to stop");
mIsClosed = true; mIsClosed = true;
resetEvent.Set(); resetEvent.Set();
closedEvent.WaitOne(); closedEvent.WaitOne();
......
...@@ -57,6 +57,11 @@ namespace DBRuntime.RDDC ...@@ -57,6 +57,11 @@ namespace DBRuntime.RDDC
} }
/// <summary>
///
/// </summary>
public bool Enable { get; set; }
#endregion ...Properties... #endregion ...Properties...
#region ... Methods ... #region ... Methods ...
...@@ -78,7 +83,7 @@ namespace DBRuntime.RDDC ...@@ -78,7 +83,7 @@ namespace DBRuntime.RDDC
{ {
while(!mIsStoped) while(!mIsStoped)
{ {
if (mClient.IsConnected) if (mClient.IsConnected && Enable)
{ {
SyncData(); SyncData();
Thread.Sleep(100); Thread.Sleep(100);
...@@ -98,9 +103,15 @@ namespace DBRuntime.RDDC ...@@ -98,9 +103,15 @@ namespace DBRuntime.RDDC
try try
{ {
var realenginer = ServiceLocator.Locator.Resolve<IRealTagConsumer>(); var realenginer = ServiceLocator.Locator.Resolve<IRealTagConsumer>();
var block = mClient.SyncRealData(); if (realenginer != null)
var size = block.ReadInt(); {
Buffer.BlockCopy(block.Array, block.ArrayOffset + block.ReaderIndex, (realenginer as RealEnginer).Memory, 0, size); var block = mClient.SyncRealData();
if (block != null)
{
var size = block.ReadInt();
Buffer.BlockCopy(block.Array, block.ArrayOffset + block.ReaderIndex, (realenginer as RealEnginer).Memory, 0, size);
}
}
} }
catch(Exception ex) catch(Exception ex)
{ {
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
using Cdy.Tag; using Cdy.Tag;
using DotNetty.Buffers; using DotNetty.Buffers;
using DotNetty.Common;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
...@@ -69,6 +70,8 @@ namespace DBRuntime.RDDC ...@@ -69,6 +70,8 @@ namespace DBRuntime.RDDC
private IByteBuffer mRealDataSyncData; private IByteBuffer mRealDataSyncData;
private object mWorkStateLockObj = new object();
#endregion ...Variables... #endregion ...Variables...
#region ... Events ... #region ... Events ...
...@@ -153,21 +156,25 @@ namespace DBRuntime.RDDC ...@@ -153,21 +156,25 @@ namespace DBRuntime.RDDC
/// <returns></returns> /// <returns></returns>
public DateTime? GetStartTime(int timeout = 5000) public DateTime? GetStartTime(int timeout = 5000)
{ {
var mb = GetBuffer(WorkStateFun,1); lock (mWorkStateLockObj)
mb.WriteByte(GetStartTimeFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
if (mWorkStateEvent.WaitOne(timeout)) var mb = GetBuffer(WorkStateFun, 1);
mb.WriteByte(GetStartTimeFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
return DateTime.FromBinary(mWorkStateData.ReadLong()); if (mWorkStateEvent.WaitOne(timeout))
{
return DateTime.FromBinary(mWorkStateData.ReadLong());
}
} }
} finally
finally {
{
}
} }
return null; return null;
} }
...@@ -178,20 +185,23 @@ namespace DBRuntime.RDDC ...@@ -178,20 +185,23 @@ namespace DBRuntime.RDDC
/// <returns></returns> /// <returns></returns>
public WorkState? GetWorkState(int timeout = 5000) public WorkState? GetWorkState(int timeout = 5000)
{ {
var mb = GetBuffer(WorkStateFun, 1); lock (mWorkStateLockObj)
mb.WriteByte(GetStateFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
if (mWorkStateEvent.WaitOne(timeout)) var mb = GetBuffer(WorkStateFun, 1);
mb.WriteByte(GetStateFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
return (WorkState)mWorkStateData.ReadByte(); if (mWorkStateEvent.WaitOne(timeout))
{
return (WorkState)mWorkStateData.ReadByte();
}
} }
} finally
finally {
{
}
} }
return null; return null;
} }
...@@ -203,20 +213,23 @@ namespace DBRuntime.RDDC ...@@ -203,20 +213,23 @@ namespace DBRuntime.RDDC
/// <returns></returns> /// <returns></returns>
public bool? SwitchToPrimary(int timeout = 5000) public bool? SwitchToPrimary(int timeout = 5000)
{ {
var mb = GetBuffer(WorkStateFun, 1); lock (mWorkStateLockObj)
mb.WriteByte(ChangeToPrimaryFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
if (mWorkStateEvent.WaitOne(timeout)) var mb = GetBuffer(WorkStateFun, 1);
mb.WriteByte(ChangeToPrimaryFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
return mWorkStateData.ReadByte()>0; if (mWorkStateEvent.WaitOne(timeout))
{
return mWorkStateData.ReadByte() > 0;
}
} }
} finally
finally {
{
}
} }
return false; return false;
} }
...@@ -229,20 +242,23 @@ namespace DBRuntime.RDDC ...@@ -229,20 +242,23 @@ namespace DBRuntime.RDDC
/// <returns></returns> /// <returns></returns>
public bool? SwitchToStandby(int timeout = 5000) public bool? SwitchToStandby(int timeout = 5000)
{ {
var mb = GetBuffer(WorkStateFun, 1); lock (mWorkStateLockObj)
mb.WriteByte(ChangeToStandbyFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
if (mWorkStateEvent.WaitOne(timeout)) var mb = GetBuffer(WorkStateFun, 1);
mb.WriteByte(ChangeToStandbyFun);
mWorkStateEvent.Reset();
Send(mb);
try
{ {
return mWorkStateData.ReadByte() > 0; if (mWorkStateEvent.WaitOne(timeout))
{
return mWorkStateData.ReadByte() > 0;
}
} }
} finally
finally {
{
}
} }
return false; return false;
} }
......
...@@ -37,6 +37,12 @@ namespace DBRuntime ...@@ -37,6 +37,12 @@ namespace DBRuntime
private object mLockObj = new object(); private object mLockObj = new object();
private WorkState mCurrentState = WorkState.Unknow;
private bool mIsClosed = false;
private Thread mScanThread;
#endregion ...Variables... #endregion ...Variables...
#region ... Events ... #region ... Events ...
...@@ -45,6 +51,9 @@ namespace DBRuntime ...@@ -45,6 +51,9 @@ namespace DBRuntime
#region ... Constructor... #region ... Constructor...
/// <summary>
///
/// </summary>
public RDDCManager() public RDDCManager()
{ {
...@@ -62,7 +71,25 @@ namespace DBRuntime ...@@ -62,7 +71,25 @@ namespace DBRuntime
/// <summary> /// <summary>
/// 当前状态 /// 当前状态
/// </summary> /// </summary>
public WorkState CurrentState { get; set; } = WorkState.Unknow; public WorkState CurrentState
{
get { return mCurrentState; }
set
{
mCurrentState = value;
if (mSync != null)
{
if (value == WorkState.Standby)
{
mSync.Enable = true;
}
else
{
mSync.Enable = false;
}
}
}
}
/// <summary> /// <summary>
/// ///
...@@ -122,6 +149,11 @@ namespace DBRuntime ...@@ -122,6 +149,11 @@ namespace DBRuntime
mSync = new DataSync() { Client = mClient }; mSync = new DataSync() { Client = mClient };
mSync.Start(); mSync.Start();
//mScanThread = new Thread(ThreadPro);
//mScanThread.IsBackground = true;
//mScanThread.Start();
CheckWorkState(); CheckWorkState();
} }
else else
...@@ -150,12 +182,30 @@ namespace DBRuntime ...@@ -150,12 +182,30 @@ namespace DBRuntime
/// </summary> /// </summary>
public void Stop() public void Stop()
{ {
mIsClosed = true;
mSync.Stop(); mSync.Stop();
mServer.Stop(); mServer.Stop();
mClient.PropertyChanged -= MClient_PropertyChanged; mClient.PropertyChanged -= MClient_PropertyChanged;
mClient.Close(); mClient.Close();
} }
private void ThreadPro()
{
while (!mIsClosed)
{
if (mClient.NeedReConnected)
{
mClient.Connect(RemoteIp,Port);
Thread.Sleep(500);
}
else
{
Thread.Sleep(1000);
}
}
}
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
...@@ -164,7 +214,9 @@ namespace DBRuntime ...@@ -164,7 +214,9 @@ namespace DBRuntime
if (!mClient.IsConnected) if (!mClient.IsConnected)
{ {
if(CurrentState != WorkState.Primary) if(CurrentState != WorkState.Primary)
SwitchTo(WorkState.Primary); {
SwitchTo(WorkState.Primary);
}
} }
else else
{ {
...@@ -207,6 +259,9 @@ namespace DBRuntime ...@@ -207,6 +259,9 @@ namespace DBRuntime
CurrentState = WorkState.Standby; CurrentState = WorkState.Standby;
} }
} }
LoggerService.Service.Info("RDDCManager", "running in " + CurrentState.ToString(), ConsoleColor.Cyan);
} }
/// <summary> /// <summary>
...@@ -219,7 +274,7 @@ namespace DBRuntime ...@@ -219,7 +274,7 @@ namespace DBRuntime
{ {
if (state == WorkState.Standby) if (state == WorkState.Standby)
{ {
if(mClient.SwitchToPrimary().Value) if(mClient.SwitchToPrimary(60000).Value)
{ {
return SwitchTo(WorkState.Standby); return SwitchTo(WorkState.Standby);
} }
...@@ -230,7 +285,7 @@ namespace DBRuntime ...@@ -230,7 +285,7 @@ namespace DBRuntime
} }
else else
{ {
if(mClient.SwitchToStandby().Value) if(mClient.SwitchToStandby(60000).Value)
{ {
return SwitchTo(WorkState.Primary); return SwitchTo(WorkState.Primary);
} }
...@@ -261,6 +316,7 @@ namespace DBRuntime ...@@ -261,6 +316,7 @@ namespace DBRuntime
/// <returns></returns> /// <returns></returns>
public bool SwitchTo(WorkState state) public bool SwitchTo(WorkState state)
{ {
if (CurrentState == WorkState.Switching) return true;
lock (mLockObj) lock (mLockObj)
{ {
var olds = CurrentState; var olds = CurrentState;
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
// 种道洋 // 种道洋
//============================================================== //==============================================================
using Cdy.Tag;
using DBRuntime.Api; using DBRuntime.Api;
using DotNetty.Buffers; using DotNetty.Buffers;
using System; using System;
...@@ -74,19 +75,21 @@ namespace DBRuntime.RDDC ...@@ -74,19 +75,21 @@ namespace DBRuntime.RDDC
break; break;
case ChangeToPrimary: case ChangeToPrimary:
var re = ProcessSwichToPrimary(); var re = ProcessSwichToPrimary();
Parent.AsyncCallback(client, ToByteBuffer(GetStartTime, re ? 1 : 0)); byte bval = re ? (byte)1 : (byte)0;
Parent.AsyncCallback(client, ToByteBuffer(FunId, bval));
break; break;
case ChangeToStandby: case ChangeToStandby:
re = ProcessSwichToStandby(); re = ProcessSwichToStandby();
Parent.AsyncCallback(client, ToByteBuffer(GetStartTime, re ? 1 : 0)); bval = re ? (byte)1 : (byte)0;
Parent.AsyncCallback(client, ToByteBuffer(FunId, bval));
break; break;
case GetState: case GetState:
var state = (byte)RDDCManager.Manager.CurrentState; var state = (byte)RDDCManager.Manager.CurrentState;
Parent.AsyncCallback(client, ToByteBuffer(GetStartTime, state)); Parent.AsyncCallback(client, ToByteBuffer(FunId, state));
break; break;
} }
base.ProcessSingleData(client, data); //base.ProcessSingleData(client, data);
} }
/// <summary> /// <summary>
...@@ -103,7 +106,15 @@ namespace DBRuntime.RDDC ...@@ -103,7 +106,15 @@ namespace DBRuntime.RDDC
/// <returns></returns> /// <returns></returns>
private bool ProcessSwichToPrimary() private bool ProcessSwichToPrimary()
{ {
return RDDCManager.Manager.SwitchTo(Cdy.Tag.WorkState.Primary); try
{
LoggerService.Service.Info("WorkStateServer", "Receive remote call to switch to primary,Start to switch!", ConsoleColor.Yellow);
return RDDCManager.Manager.SwitchTo(Cdy.Tag.WorkState.Primary);
}
finally
{
LoggerService.Service.Info("WorkStateServer", "Completely to switch", ConsoleColor.Yellow);
}
} }
/// <summary> /// <summary>
...@@ -112,7 +123,15 @@ namespace DBRuntime.RDDC ...@@ -112,7 +123,15 @@ namespace DBRuntime.RDDC
/// <returns></returns> /// <returns></returns>
private bool ProcessSwichToStandby() private bool ProcessSwichToStandby()
{ {
return RDDCManager.Manager.SwitchTo(Cdy.Tag.WorkState.Standby); try
{
LoggerService.Service.Info("WorkStateServer", "Receive remote call to switch to standby!", ConsoleColor.Yellow);
return RDDCManager.Manager.SwitchTo(Cdy.Tag.WorkState.Standby);
}
finally
{
LoggerService.Service.Info("WorkStateServer", "Completely to switch", ConsoleColor.Yellow);
}
} }
#endregion ...Methods... #endregion ...Methods...
......
...@@ -86,14 +86,30 @@ namespace Cdy.Tag ...@@ -86,14 +86,30 @@ namespace Cdy.Tag
{ {
RDDCManager.Manager.SwitchWorkStateAction = new Func<WorkState, bool>((state) => RDDCManager.Manager.SwitchWorkStateAction = new Func<WorkState, bool>((state) =>
{ {
if(state == WorkState.Primary) if (mIsStarted)
{ {
if (state == WorkState.Primary)
return SwitchToPrimary(); {
return SwitchToPrimary();
}
else
{
return SwitchToStandby();
}
} }
else else
{ {
return SwitchToStandby(); while (!mIsStarted) System.Threading.Thread.Sleep(100);
if (state == WorkState.Primary)
{
return SwitchToPrimary();
}
else
{
return SwitchToStandby();
}
} }
}); });
} }
...@@ -184,8 +200,6 @@ namespace Cdy.Tag ...@@ -184,8 +200,6 @@ namespace Cdy.Tag
CurrentDatabase = mRealDatabase.Name; CurrentDatabase = mRealDatabase.Name;
CurrentDatabaseLastUpdateTime = mRealDatabase.UpdateTime; CurrentDatabaseLastUpdateTime = mRealDatabase.UpdateTime;
RDDCManager.Manager.Load(mDatabaseName);
sw.Stop(); sw.Stop();
LoggerService.Service.Info("LoadDatabase", "load " +mDatabaseName +" take " + sw.ElapsedMilliseconds.ToString() +" ms"); LoggerService.Service.Info("LoadDatabase", "load " +mDatabaseName +" take " + sw.ElapsedMilliseconds.ToString() +" ms");
} }
...@@ -362,6 +376,8 @@ namespace Cdy.Tag ...@@ -362,6 +376,8 @@ namespace Cdy.Tag
LoggerService.Service.EnableLogger = true; LoggerService.Service.EnableLogger = true;
LoggerService.Service.Info("Runner", " 数据库 " + database+" 开始启动"); LoggerService.Service.Info("Runner", " 数据库 " + database+" 开始启动");
RDDCManager.Manager.Load(database);
RDDCManager.Manager.Start(); RDDCManager.Manager.Start();
var re = await InitAsync(database); var re = await InitAsync(database);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册