diff --git a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp index 2c11f2d9428db5951d578aa8da8304920a32a50d..fb42a66c63d701b1795eb8521b1aa2cd585e189a 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp @@ -2,7 +2,9 @@ #include "st_clientnode_applayer.h" #include #include "st_cross_svr_node.h" +#include "st_cross_svr_msg.h" #include +#include namespace SmartLink{ using namespace std::placeholders; st_client_table::st_client_table( @@ -95,6 +97,7 @@ namespace SmartLink{ m_hash_mutex.lock(); m_hash_uuid2node[c->uuid()] = c; m_hash_mutex.unlock(); + broadcast_client_uuid(c->uuid(),true); return true; } @@ -194,7 +197,10 @@ namespace SmartLink{ //This is important. some time m_hash_sock2node and m_hash_uuid2node, same uuid has different socket. if (m_hash_uuid2node.contains(pClientNode->uuid())) if (m_hash_uuid2node[pClientNode->uuid()]==pClientNode) + { m_hash_uuid2node.remove(pClientNode->uuid()); + broadcast_client_uuid(pClientNode->uuid(),false); + } } pClientNode->bTermSet = true; @@ -274,14 +280,50 @@ namespace SmartLink{ //this event indicates new svr successfully hand-shaked. void st_client_table::on_evt_NewSvrConnected(const QString & svrHandle) { - const char * pstr = "Hello World!"; - m_pCluster->SendDataToRemoteServer(svrHandle,QByteArray(pstr)); - emit evt_Message(this,"Send Svr Msg to "+svrHandle); + //Send All Client UUIDs to new Svr + m_hash_mutex.lock(); + QList uuids = m_hash_uuid2node.keys(); + int nNodeSz = uuids.size(); + if (nNodeSz>0) + { + int nMsgLen = sizeof(STCROSSSVR_MSG::tag_msgHearder) + nNodeSz * sizeof(quint32); + QByteArray array(nMsgLen,0); + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) array.data(); + pMsg->header.Mark = 0x4567; + pMsg->header.version = 1; + pMsg->header.messageLen = nNodeSz * sizeof(quint32); + pMsg->header.mesageType = 0x01; + int ct = -1; + foreach (quint32 uuid,uuids) + pMsg->payload.uuids[++ct] = uuid; + m_pCluster->SendDataToRemoteServer(svrHandle,array); + } + m_hash_mutex.unlock(); + emit evt_Message(this,tr("Send Initial UUIDs to Remote Svr:") + svrHandle); + } + void st_client_table::broadcast_client_uuid(quint32 uuid, bool bActive) + { + QStringList svrs = m_pCluster->SvrNames(); + if (svrs.empty()==false) + { + int nMsgLen = sizeof(STCROSSSVR_MSG::tag_msgHearder) + sizeof(quint32); + QByteArray array(nMsgLen,0); + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) array.data(); + pMsg->header.Mark = 0x4567; + pMsg->header.version = 1; + pMsg->header.messageLen = sizeof(quint32); + pMsg->header.mesageType = bActive==true?0x01:0x02; + pMsg->payload.uuids[0] = uuid; + foreach (QString svr,svrs) + m_pCluster->SendDataToRemoteServer(svr,array); + } } //this event indicates a client disconnected. void st_client_table::on_evt_NewSvrDisconnected(const QString & svrHandle) { + //remove all client-maps belongs to this server. + this->cross_svr_del_uuids(svrHandle,NULL,0); emit evt_Message(this,"Svr DisConnected. " + svrHandle); } @@ -305,5 +347,42 @@ namespace SmartLink{ pNode->setClientTable(this); return pNode; } + //reg new uuids in m_hash_remoteClient2SvrName + void st_client_table::cross_svr_add_uuids(const QString & svrname,quint32 * pUUIDs, int nUUIDs) + { + m_mutex_cross_svr_map.lock(); + for (int i=0;i keys; + for(std::unordered_map::iterator p = + m_hash_remoteClient2SvrName.begin(); + p!=m_hash_remoteClient2SvrName.end();p++) + { + if ((*p).second == svrname ) + keys.push_back((*p).first); + } + foreach (quint32 key, keys) + { + m_hash_remoteClient2SvrName.erase(key); + } + } + else + { + for (int i=0;i m_nodeToBeDel; @@ -72,6 +80,7 @@ namespace SmartLink{ //cluster Nodes Map std::unordered_map m_hash_remoteClient2SvrName; + QMutex m_mutex_cross_svr_map; //Cluster Node Factory ZP_Cluster::zp_ClusterNode * cross_svr_node_factory( ZP_Cluster::zp_ClusterTerm * /*pTerm*/, diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp index d2414f235cc82bd794b5537a9c09f3188d13f3aa..12b3053d0089b604017c0bc19155857af7549d78 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp @@ -6,11 +6,85 @@ namespace SmartLink{ st_cross_svr_node::st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) :ZP_Cluster::zp_ClusterNode(pTerm,psock,parent) { + m_currStMegSize = 0; } + int st_cross_svr_node::st_bytesLeft() + { + return m_st_Header.messageLen + sizeof(STCROSSSVR_MSG::tag_msgHearder) - m_currStMegSize ; + } + bool st_cross_svr_node::deal_user_data(const QByteArray &array) { + const char * pData = array.constData(); + int nBlockSize = array.size(); + int nOffset = 0; + while (nOffset < nBlockSize ) + { + while (m_currStMegSize < sizeof(STCROSSSVR_MSG::tag_msgHearder) && nOffset< nBlockSize) + { + m_currStBlock.push_back(pData[nOffset++]); + ++m_currStMegSize; + } + if (m_currStMegSize < sizeof(STCROSSSVR_MSG::tag_msgHearder)) + return true; + if (m_currStMegSize == sizeof(STCROSSSVR_MSG::tag_msgHearder)) + { + memcpy (&m_st_Header,m_currStBlock.constData(),sizeof(STCROSSSVR_MSG::tag_msgHearder)); + if (m_st_Header.Mark != 0x4567) + { + m_currStMegSize = 0; + m_currStBlock.clear(); + return true; + } + } + while (nOffset0) + return delCurrBlock; + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) m_currStBlock.constData(); + int nUUIDs = pMsg->header.messageLen / sizeof(quint32); + this->m_pClientTable->cross_svr_add_uuids(this->termName(),pMsg->payload.uuids,nUUIDs); + } + break; + case 0x02: //client node exit + { + if (st_bytesLeft()>0) + return delCurrBlock; + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) m_currStBlock.constData(); + int nUUIDs = pMsg->header.messageLen / sizeof(quint32); + this->m_pClientTable->cross_svr_del_uuids(this->termName(),pMsg->payload.uuids,nUUIDs); + } + break; + case 0x03: // data transfer + delCurrBlock = true; + break; + default: + break; + } + + return delCurrBlock; + + } + void st_cross_svr_node::setClientTable(st_client_table * table) { this->m_pClientTable = table; diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h index 7df136ea1881dd71a7b9db8004ed42dba17c4894..e6cd3a674101fa54c58f3e1433e05879e73455da 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h @@ -1,6 +1,7 @@ #ifndef ST_CROSS_SVR_NODE_H #define ST_CROSS_SVR_NODE_H #include "../cluster/zp_clusternode.h" +#include "st_cross_svr_msg.h" namespace SmartLink{ class st_client_table; class st_cross_svr_node : public ZP_Cluster::zp_ClusterNode @@ -12,8 +13,18 @@ namespace SmartLink{ protected: //!virtual functions, dealing with the user-defined operations. virtual bool deal_user_data(const QByteArray &); + bool deal_msg(); + int st_bytesLeft(); protected: st_client_table * m_pClientTable; + //Current Message Offset, according to m_currentHeader + int m_currStMegSize; + //Current un-procssed message block.for large blocks, + //this array will be re-setted as soon as some part of data has been + //dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces + QByteArray m_currStBlock; + //current Header + STCROSSSVR_MSG::tag_msgHearder m_st_Header; }; } #endif // ST_CROSS_SVR_NODE_H