From f524aa5fbf0e556a3e67ad652062393343313d33 Mon Sep 17 00:00:00 2001 From: goldenhawking Date: Tue, 29 Apr 2014 23:22:36 +0800 Subject: [PATCH] Test Node 2 Cluster Messages, OK. --- .../cluster/zp_clusternode.cpp | 12 +-- .../cluster/zp_clusterterm.cpp | 7 +- .../smartlink/st_client_table.cpp | 83 ++++++++++++++----- .../smartlink/st_client_table.h | 23 ++++- ZoomPipeline_FuncSvr/zpmainframe.cpp | 1 + 5 files changed, 94 insertions(+), 32 deletions(-) diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp index fbc48e5..fd5ae3d 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp @@ -195,14 +195,10 @@ namespace ZP_Cluster{ { qint32 bytesLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; - if (bytesLeft==0 && m_currentMessageSize>0) - { - if (m_currentBlock.length()>=64) - emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64)); - else - emit evt_Message(this,"Debug:" + m_currentBlock.toHex()); - - } + if (m_currentBlock.length()>=64) + emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64)); + else + emit evt_Message(this,"Debug:" + m_currentBlock.toHex()); const CROSS_SVR_MSG * pMsg =(const CROSS_SVR_MSG *) m_currentBlock.constData(); switch(m_currentHeader.messagetype) { diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp index e577c6a..6700bad 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp @@ -235,7 +235,12 @@ namespace ZP_Cluster{ disconnect (pClientNode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved); m_hash_sock2node.remove(clientHandle); if (pClientNode->termName().length()>0) - m_hash_Name2node.remove(pClientNode->termName()); + { + //This is important. some time m_hash_Name2node and m_hash_sock2node, same uuid has different socket. + if (m_hash_Name2node.contains(pClientNode->termName())) + if (m_hash_Name2node[pClientNode->termName()]==pClientNode) + m_hash_Name2node.remove(pClientNode->termName()); + } pClientNode->bTermSet = true; m_nodeToBeDel.push_back(pClientNode); diff --git a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp index 556ab9f..d2b2cb4 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp @@ -4,21 +4,29 @@ #include namespace SmartLink{ st_client_table::st_client_table( - ZPNetwork::zp_net_Engine * pool, + ZPNetwork::zp_net_Engine * NetEngine, ZPTaskEngine::zp_pipeline * taskeng, ZPDatabase::DatabaseResource * pDb, + ZP_Cluster::zp_ClusterTerm * pCluster, QObject *parent) : QObject(parent) - ,m_pThreadPool(pool) + ,m_pThreadEngine(NetEngine) ,m_pTaskEngine(taskeng) ,m_pDatabaseRes(pDb) + ,m_pCluster(pCluster) { m_nHeartBeatingDeadThrd = 180; - connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection); - connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted,this,&st_client_table::on_evt_ClientEncrypted,Qt::QueuedConnection); - connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection); - connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection); - connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection); + connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection); + connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted,this,&st_client_table::on_evt_ClientEncrypted,Qt::QueuedConnection); + connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection); + connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection); + connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection); + + connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrConnected,this,&st_client_table::on_evt_NewSvrConnected,Qt::QueuedConnection); + connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrDisconnected,this,&st_client_table::on_evt_NewSvrDisconnected,Qt::QueuedConnection); + connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_recieved,this,&st_client_table::on_evt_RemoteData_recieved,Qt::QueuedConnection); + connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_transferred,this,&st_client_table::on_evt_RemoteData_transferred,Qt::QueuedConnection); + } int st_client_table::heartBeatingThrd() @@ -120,9 +128,9 @@ namespace SmartLink{ { st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0); //using queued connection of send and revieve; - connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); - connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); - connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; @@ -147,9 +155,9 @@ namespace SmartLink{ { st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0); //using queued connection of send and revieve; - connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); - connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); - connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; @@ -176,12 +184,17 @@ namespace SmartLink{ { m_hash_sock2node.remove(clientHandle); if (pClientNode->uuidValid()) - m_hash_uuid2node.remove(pClientNode->uuid()); + { + //This is important. some time m_hash_sock2node and m_hash_uuid2node, same uuid has different socket. + if (m_hash_uuid2node.contains(pClientNode->uuid())) + if (m_hash_uuid2node[pClientNode->uuid()]==pClientNode) + m_hash_uuid2node.remove(pClientNode->uuid()); + } pClientNode->bTermSet = true; - disconnect (pClientNode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient); - disconnect (pClientNode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData); - disconnect (pClientNode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients); + disconnect (pClientNode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient); + disconnect (pClientNode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData); + disconnect (pClientNode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients); disconnect (pClientNode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message); m_nodeToBeDel.push_back(pClientNode); @@ -223,9 +236,9 @@ namespace SmartLink{ { st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0); //using queued connection of send and revieve; - connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); - connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); - connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); + connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; @@ -252,5 +265,35 @@ namespace SmartLink{ } + //this event indicates new svr successfully hand-shaked. + void st_client_table::on_evt_NewSvrConnected(const QString & svrHandle) + { + const char * pstr = "Hello World!"; + m_pCluster->SendDataToRemoteServer(svrHandle,QByteArray(pstr)); + emit evt_Message(this,"Send Svr Msg to "+svrHandle); + } + + //this event indicates a client disconnected. + void st_client_table::on_evt_NewSvrDisconnected(const QString & svrHandle) + { + emit evt_Message(this,"Svr DisConnected. " + svrHandle); + } + + //some data arrival + void st_client_table::on_evt_RemoteData_recieved(const QString & svrHandle,const QByteArray & array ) + { + const char * ptr = array.constData(); + QString str; + for (int i=0;i m_hash_sock2node; //Concurrent Network frame work - ZPNetwork::zp_net_Engine * m_pThreadPool; + ZPNetwork::zp_net_Engine * m_pThreadEngine; //The piple-line ZPTaskEngine::zp_pipeline * m_pTaskEngine; //The database pool ZPDatabase::DatabaseResource * m_pDatabaseRes; + //The Server Cluster Group + ZP_Cluster::zp_ClusterTerm * m_pCluster; //The max seconds before dead client be kicked out int m_nHeartBeatingDeadThrd; @@ -63,7 +71,7 @@ namespace SmartLink{ signals: void evt_Message (QObject * psource,const QString &); - public slots: + protected slots: //this event indicates new client connected. void on_evt_NewClientConnected(QObject * /*clientHandle*/); //this event indicates new client encrypted. @@ -75,6 +83,15 @@ namespace SmartLink{ //a block of data has been successfuly sent void on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/); + //this event indicates new svr successfully hand-shaked. + void on_evt_NewSvrConnected(const QString &/*svrHandle*/); + //this event indicates a client disconnected. + void on_evt_NewSvrDisconnected(const QString &/*svrHandle*/); + //some data arrival + void on_evt_RemoteData_recieved(const QString &/*svrHandle*/,const QByteArray & /*svrHandle*/ ); + //a block of data has been successfuly sent + void on_evt_RemoteData_transferred(QObject * /*svrHandle*/,qint64 /*bytes sent*/); + }; } #endif // ST_CLIENT_TABLE_H diff --git a/ZoomPipeline_FuncSvr/zpmainframe.cpp b/ZoomPipeline_FuncSvr/zpmainframe.cpp index 6cd7229..cc59e67 100644 --- a/ZoomPipeline_FuncSvr/zpmainframe.cpp +++ b/ZoomPipeline_FuncSvr/zpmainframe.cpp @@ -42,6 +42,7 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) : m_clientTable = new SmartLink::st_client_table (m_netEngine, m_taskEngine, m_pDatabases, + m_pClusterTerm, this); connect (m_clientTable,&SmartLink::st_client_table::evt_Message,this,&ZPMainFrame::on_evt_Message_Smartlink); -- GitLab