提交 f524aa5f 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Test Node 2 Cluster Messages, OK.

上级 38cf314b
......@@ -195,14 +195,10 @@ namespace ZP_Cluster{
{
qint32 bytesLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
-m_currentMessageSize ;
if (bytesLeft==0 && m_currentMessageSize>0)
{
if (m_currentBlock.length()>=64)
emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64));
else
emit evt_Message(this,"Debug:" + m_currentBlock.toHex());
}
if (m_currentBlock.length()>=64)
emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64));
else
emit evt_Message(this,"Debug:" + m_currentBlock.toHex());
const CROSS_SVR_MSG * pMsg =(const CROSS_SVR_MSG *) m_currentBlock.constData();
switch(m_currentHeader.messagetype)
{
......
......@@ -235,7 +235,12 @@ namespace ZP_Cluster{
disconnect (pClientNode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved);
m_hash_sock2node.remove(clientHandle);
if (pClientNode->termName().length()>0)
m_hash_Name2node.remove(pClientNode->termName());
{
//This is important. some time m_hash_Name2node and m_hash_sock2node, same uuid has different socket.
if (m_hash_Name2node.contains(pClientNode->termName()))
if (m_hash_Name2node[pClientNode->termName()]==pClientNode)
m_hash_Name2node.remove(pClientNode->termName());
}
pClientNode->bTermSet = true;
m_nodeToBeDel.push_back(pClientNode);
......
......@@ -4,21 +4,29 @@
#include <functional>
namespace SmartLink{
st_client_table::st_client_table(
ZPNetwork::zp_net_Engine * pool,
ZPNetwork::zp_net_Engine * NetEngine,
ZPTaskEngine::zp_pipeline * taskeng,
ZPDatabase::DatabaseResource * pDb,
ZP_Cluster::zp_ClusterTerm * pCluster,
QObject *parent) :
QObject(parent)
,m_pThreadPool(pool)
,m_pThreadEngine(NetEngine)
,m_pTaskEngine(taskeng)
,m_pDatabaseRes(pDb)
,m_pCluster(pCluster)
{
m_nHeartBeatingDeadThrd = 180;
connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted,this,&st_client_table::on_evt_ClientEncrypted,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection);
connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection);
connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted,this,&st_client_table::on_evt_ClientEncrypted,Qt::QueuedConnection);
connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection);
connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection);
connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection);
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrConnected,this,&st_client_table::on_evt_NewSvrConnected,Qt::QueuedConnection);
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrDisconnected,this,&st_client_table::on_evt_NewSvrDisconnected,Qt::QueuedConnection);
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_recieved,this,&st_client_table::on_evt_RemoteData_recieved,Qt::QueuedConnection);
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_transferred,this,&st_client_table::on_evt_RemoteData_transferred,Qt::QueuedConnection);
}
int st_client_table::heartBeatingThrd()
......@@ -120,9 +128,9 @@ namespace SmartLink{
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
......@@ -147,9 +155,9 @@ namespace SmartLink{
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
......@@ -176,12 +184,17 @@ namespace SmartLink{
{
m_hash_sock2node.remove(clientHandle);
if (pClientNode->uuidValid())
m_hash_uuid2node.remove(pClientNode->uuid());
{
//This is important. some time m_hash_sock2node and m_hash_uuid2node, same uuid has different socket.
if (m_hash_uuid2node.contains(pClientNode->uuid()))
if (m_hash_uuid2node[pClientNode->uuid()]==pClientNode)
m_hash_uuid2node.remove(pClientNode->uuid());
}
pClientNode->bTermSet = true;
disconnect (pClientNode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message);
m_nodeToBeDel.push_back(pClientNode);
......@@ -223,9 +236,9 @@ namespace SmartLink{
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
......@@ -252,5 +265,35 @@ namespace SmartLink{
}
//this event indicates new svr successfully hand-shaked.
void st_client_table::on_evt_NewSvrConnected(const QString & svrHandle)
{
const char * pstr = "Hello World!";
m_pCluster->SendDataToRemoteServer(svrHandle,QByteArray(pstr));
emit evt_Message(this,"Send Svr Msg to "+svrHandle);
}
//this event indicates a client disconnected.
void st_client_table::on_evt_NewSvrDisconnected(const QString & svrHandle)
{
emit evt_Message(this,"Svr DisConnected. " + svrHandle);
}
//some data arrival
void st_client_table::on_evt_RemoteData_recieved(const QString & svrHandle,const QByteArray & array )
{
const char * ptr = array.constData();
QString str;
for (int i=0;i<array.size();i++)
str.push_back(QChar(ptr[i]));
emit evt_Message(this,"Recieved Svr Msg from " + svrHandle +":" +str);
}
//a block of data has been successfuly sent
void st_client_table::on_evt_RemoteData_transferred(QObject * /*svrHandle*/,qint64 /*bytes sent*/)
{
}
}
......@@ -9,13 +9,19 @@
#include "../pipeline/zp_pipeline.h"
#include "./st_message.h"
#include "../database/databaseresource.h"
#include "../cluster/zp_clusterterm.h"
namespace SmartLink{
class st_clientNode_baseTrans;
class st_client_table : public QObject
{
Q_OBJECT
public:
explicit st_client_table(ZPNetwork::zp_net_Engine * pool, ZPTaskEngine::zp_pipeline * taskeng, ZPDatabase::DatabaseResource *pDb, QObject *parent = 0);
explicit st_client_table(
ZPNetwork::zp_net_Engine * NetEngine
,ZPTaskEngine::zp_pipeline * taskeng
,ZPDatabase::DatabaseResource *pDb
,ZP_Cluster::zp_ClusterTerm * pCluster
,QObject *parent = 0);
~st_client_table();
bool regisitClientUUID(st_clientNode_baseTrans *);
......@@ -46,11 +52,13 @@ namespace SmartLink{
QMap<QObject *,st_clientNode_baseTrans *> m_hash_sock2node;
//Concurrent Network frame work
ZPNetwork::zp_net_Engine * m_pThreadPool;
ZPNetwork::zp_net_Engine * m_pThreadEngine;
//The piple-line
ZPTaskEngine::zp_pipeline * m_pTaskEngine;
//The database pool
ZPDatabase::DatabaseResource * m_pDatabaseRes;
//The Server Cluster Group
ZP_Cluster::zp_ClusterTerm * m_pCluster;
//The max seconds before dead client be kicked out
int m_nHeartBeatingDeadThrd;
......@@ -63,7 +71,7 @@ namespace SmartLink{
signals:
void evt_Message (QObject * psource,const QString &);
public slots:
protected slots:
//this event indicates new client connected.
void on_evt_NewClientConnected(QObject * /*clientHandle*/);
//this event indicates new client encrypted.
......@@ -75,6 +83,15 @@ namespace SmartLink{
//a block of data has been successfuly sent
void on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/);
//this event indicates new svr successfully hand-shaked.
void on_evt_NewSvrConnected(const QString &/*svrHandle*/);
//this event indicates a client disconnected.
void on_evt_NewSvrDisconnected(const QString &/*svrHandle*/);
//some data arrival
void on_evt_RemoteData_recieved(const QString &/*svrHandle*/,const QByteArray & /*svrHandle*/ );
//a block of data has been successfuly sent
void on_evt_RemoteData_transferred(QObject * /*svrHandle*/,qint64 /*bytes sent*/);
};
}
#endif // ST_CLIENT_TABLE_H
......@@ -42,6 +42,7 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) :
m_clientTable = new SmartLink::st_client_table (m_netEngine,
m_taskEngine,
m_pDatabases,
m_pClusterTerm,
this);
connect (m_clientTable,&SmartLink::st_client_table::evt_Message,this,&ZPMainFrame::on_evt_Message_Smartlink);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册