From 38cf314be4d6619a4c36146e80a8c3a79e12a299 Mon Sep 17 00:00:00 2001 From: goldenhawking Date: Tue, 29 Apr 2014 14:04:30 +0800 Subject: [PATCH] Add Cluster App-Layer function interface. need further approachs. --- .../cluster/zp_clusternode.cpp | 21 +++++++++++-- ZoomPipeline_FuncSvr/cluster/zp_clusternode.h | 5 +++ .../cluster/zp_clusterterm.cpp | 31 +++++++++++++++++-- ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h | 12 +++++++ 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp index dd88891..fbc48e5 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp @@ -222,8 +222,11 @@ namespace ZP_Cluster{ emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName); emit evt_close_client(this->sock()); } - //else - //m_pTerm->BroadcastServers(); + else + { + emit evt_NewSvrConnected(this->termName()); + m_pTerm->BroadcastServers(); + } } else { @@ -253,6 +256,20 @@ namespace ZP_Cluster{ } } break; + case 0x03: + if (m_currentMessageSize==m_currentBlock.size()) + { + QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header), + m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header)); + emit evt_RemoteData_recieved(this->termName(),arraySend); + } + else + { + QByteArray arraySend(m_currentBlock); + emit evt_RemoteData_recieved(this->termName(),arraySend); + } + m_currentBlock = QByteArray(); + break; default: emit evt_Message(this,tr("Info:Unknown Msg Type got.")); emit evt_close_client(this->sock()); diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h index a07ec77..c8124d1 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h @@ -72,6 +72,11 @@ namespace ZP_Cluster{ void evt_close_client(QObject * objClient); void evt_connect_to(const QHostAddress & address , quint16 nPort,bool bSSLConn); void evt_Message (QObject * psource,const QString &); + + //Notify Messages + void evt_NewSvrConnected(const QString & /*svrHandle*/); + //some data arrival + void evt_RemoteData_recieved(const QString & /*svrHandle*/,const QByteArray & /*svrHandle*/ ); }; } #endif // ZP_CLUSTERNODE_H diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp index 540a6f0..e577c6a 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp @@ -165,6 +165,8 @@ namespace ZP_Cluster{ connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; pClientNode = pnode; @@ -195,6 +197,8 @@ namespace ZP_Cluster{ connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; pClientNode = pnode; @@ -213,6 +217,7 @@ namespace ZP_Cluster{ void zp_ClusterTerm::on_evt_ClientDisconnected(QObject * clientHandle) { bool nHashContains = false; + zp_ClusterNode * pClientNode = 0; m_hash_mutex.lock(); nHashContains = m_hash_sock2node.contains(clientHandle); @@ -220,17 +225,22 @@ namespace ZP_Cluster{ pClientNode = m_hash_sock2node[clientHandle]; if (pClientNode) { + QString nameCurr = pClientNode->termName(); disconnect (pClientNode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient); disconnect (pClientNode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData); disconnect (pClientNode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients); disconnect (pClientNode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message); disconnect (pClientNode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo); + disconnect (pClientNode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected); + disconnect (pClientNode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved); m_hash_sock2node.remove(clientHandle); if (pClientNode->termName().length()>0) m_hash_Name2node.remove(pClientNode->termName()); pClientNode->bTermSet = true; m_nodeToBeDel.push_back(pClientNode); + if (nameCurr.length()>0) + emit evt_NewSvrDisconnected(nameCurr); //qDebug()<ref()); } m_hash_mutex.unlock(); @@ -271,6 +281,8 @@ namespace ZP_Cluster{ connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; pClientNode = pnode; @@ -296,9 +308,9 @@ namespace ZP_Cluster{ m_hash_mutex.unlock(); } //a block of data has been successfuly sent - void zp_ClusterTerm::on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/) + void zp_ClusterTerm::on_evt_Data_transferred(QObject * clientHandle, qint64 bytesent) { - + emit evt_RemoteData_transferred(clientHandle,bytesent); } void zp_ClusterTerm::BroadcastServers() { @@ -363,6 +375,19 @@ namespace ZP_Cluster{ netEng()->SendDataToClient(m_hash_Name2node[key]->sock(),array); } m_hash_mutex.unlock(); - BroadcastServers(); + } + void zp_ClusterTerm::SendDataToRemoteServer(const QString & svrName,const QByteArray & SourceArray) + { + int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + SourceArray.size(); + QByteArray array(nMsgLen,0); + CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data(); + pMsg->hearder.Mark = 0x1234; + pMsg->hearder.data_length = SourceArray.size(); + pMsg->hearder.messagetype = 0x03; + memcpy (pMsg->payload.data,SourceArray.constData(),SourceArray.size()); + m_hash_mutex.lock(); + if (m_hash_Name2node.contains(svrName)) + netEng()->SendDataToClient(m_hash_Name2node[svrName]->sock(),array); + m_hash_mutex.unlock(); } } diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h index 6ffb946..676c10e 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h @@ -61,6 +61,15 @@ namespace ZP_Cluster{ void evt_Message(QObject * ,const QString &); //The socket error message void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError); + //this event indicates new svr successfully hand-shaked. + void evt_NewSvrConnected(const QString &/*svrHandle*/); + //this event indicates a client disconnected. + void evt_NewSvrDisconnected(const QString &/*svrHandle*/); + //some data arrival + void evt_RemoteData_recieved(const QString &/*svrHandle*/,const QByteArray & /*svrHandle*/ ); + //a block of data has been successfuly sent + void evt_RemoteData_transferred(QObject * /*svrHandle*/,qint64 /*bytes sent*/); + protected slots: //this event indicates new client connected. void on_evt_NewClientConnected(QObject * /*clientHandle*/); @@ -82,6 +91,9 @@ namespace ZP_Cluster{ bool JoinCluster(const QHostAddress &addr, int nPort,bool bSSL=false); void KickDeadClients(); + //Send Data to Server + void SendDataToRemoteServer(const QString & svrName,const QByteArray &); + }; } #endif // ZP_CLUSTERTERM_H -- GitLab