From 5ebe6d3c8a5af0ac326ea5d254a57ec185d242e7 Mon Sep 17 00:00:00 2001 From: goldenhawking Date: Fri, 2 May 2014 00:10:49 +0800 Subject: [PATCH] use a call-back method to deal with cross-svr messages. --- ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro | 7 ++- .../cluster/zp_clusternode.cpp | 15 +++++-- ZoomPipeline_FuncSvr/cluster/zp_clusternode.h | 10 +++-- .../cluster/zp_clusterterm.cpp | 41 +++++++++++++++-- ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h | 25 ++++++++++- ZoomPipeline_FuncSvr/pipeline/zp_pipeline.h | 1 - .../smartlink/st_client_table.cpp | 8 +--- .../smartlink/st_client_table.h | 6 +++ .../smartlink/st_cross_svr_msg.h | 44 +++++++++++++++++++ .../smartlink/st_cross_svr_node.cpp | 26 +++++++++++ .../smartlink/st_cross_svr_node.h | 23 ++++++++++ ZoomPipeline_FuncSvr/zpmainframe.cpp | 9 ++-- 12 files changed, 189 insertions(+), 26 deletions(-) create mode 100644 ZoomPipeline_FuncSvr/smartlink/st_cross_svr_msg.h create mode 100644 ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp create mode 100644 ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h diff --git a/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro b/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro index 2e8d479..839c968 100644 --- a/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro +++ b/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro @@ -28,7 +28,8 @@ SOURCES += main.cpp\ smartlink/st_clientnode_applayer.cpp \ cluster/zp_clusterterm.cpp \ dialogaddressinput.cpp \ - cluster/zp_clusternode.cpp + cluster/zp_clusternode.cpp \ + smartlink/st_cross_svr_node.cpp HEADERS += zpmainframe.h \ network/zp_tcpserver.h \ @@ -47,7 +48,9 @@ HEADERS += zpmainframe.h \ cluster/zp_clusterterm.h \ cluster/cross_svr_messages.h \ dialogaddressinput.h \ - cluster/zp_clusternode.h + cluster/zp_clusternode.h \ + smartlink/st_cross_svr_msg.h \ + smartlink/st_cross_svr_node.h FORMS += zpmainframe.ui \ dialogaddressinput.ui diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp index 6f4f300..19235bd 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp @@ -269,14 +269,14 @@ namespace ZP_Cluster{ { QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header), m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header)); - emit evt_RemoteData_recieved(this->termName(),arraySend); + if (deal_user_data(arraySend)==true) + m_currentBlock = QByteArray(); } else { - QByteArray arraySend(m_currentBlock); - emit evt_RemoteData_recieved(this->termName(),arraySend); + if (deal_user_data(m_currentBlock)==true) + m_currentBlock = QByteArray(); } - m_currentBlock = QByteArray(); break; default: emit evt_Message(this,tr("Info:Unknown Msg Type got.")); @@ -286,6 +286,13 @@ namespace ZP_Cluster{ return 0; } + + bool zp_ClusterNode::deal_user_data(const QByteArray & data) + { + emit evt_RemoteData_recieved(this->termName(),data); + return true; + } + void zp_ClusterNode::CheckHeartBeating() { QDateTime dtm = QDateTime::currentDateTime(); diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h index c8124d1..aa8a3b8 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h @@ -20,18 +20,22 @@ namespace ZP_Cluster{ explicit zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent = 0); int run(); bool bTermSet; - //!deal at most m_nMessageBlockSize messages per deal_message(); - static const int m_nMessageBlockSize = 8; //push new binary data into queue int push_new_data(const QByteArray & dtarray); + void CheckHeartBeating(); + + protected: + //!deal at most m_nMessageBlockSize messages per deal_message(); + static const int m_nMessageBlockSize = 8; //!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader //!return bytes Used. int filter_message(const QByteArray &, int offset); //!in Trans-Layer, it does nothing. int deal_current_message_block(); + //!virtual functions, dealing with the user-defined operations. + virtual bool deal_user_data(const QByteArray &); QDateTime lastActiveTime(); - void CheckHeartBeating(); public: QString termName(); diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp index 6700bad..065d587 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp @@ -2,6 +2,7 @@ #include "zp_clusternode.h" #include namespace ZP_Cluster{ + using namespace std::placeholders; zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) : QObject(parent) ,m_strTermName(name) @@ -17,8 +18,42 @@ namespace ZP_Cluster{ //connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted, this,&zp_ClusterTerm::on_evt_ClientEncrypted); m_nPortPublish = 0; m_nHeartBeatingTime = 20; + m_factory = std::bind(&zp_ClusterTerm::default_factory,this,_1,_2,_3); + } + + /** + * @brief The factory enables user-defined sub-classes inherits from zp_ClusterNode + * Using SetNodeFactory , set your own allocate method. + * @fn zp_ClusterTerm::default_factory the default factory function. just return zp_ClusterTerm * + * @param pTerm Term object + * @param psock Sock Object + * @param parent Parent + * @return zp_ClusterNode * + */ + zp_ClusterNode * zp_ClusterTerm::default_factory( + zp_ClusterTerm * pTerm, + QObject * psock, + QObject * parent) + { + return new zp_ClusterNode(pTerm,psock,parent); + } + /** + * @brief Using SetNodeFactory , set your own allocate method. + * + * @fn zp_ClusterTerm::SetNodeFactory + * @param fac The functor + */ + void zp_ClusterTerm::SetNodeFactory(std::function< + zp_ClusterNode * ( + zp_ClusterTerm * /*pTerm*/, + QObject * /*psock*/, + QObject * /*parent*/)> fac + ) + { + m_factory = fac; } + int zp_ClusterTerm::publishPort(){ return m_nPortPublish; @@ -158,7 +193,7 @@ namespace ZP_Cluster{ nHashContains = m_hash_sock2node.contains(clientHandle); if (false==nHashContains) { - zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); + zp_ClusterNode * pnode = m_factory(this,clientHandle,0); //using queued connection of send and revieve; connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); @@ -190,7 +225,7 @@ namespace ZP_Cluster{ nHashContains = m_hash_sock2node.contains(clientHandle); if (false==nHashContains) { - zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); + zp_ClusterNode * pnode = m_factory(this,clientHandle,0); //using queued connection of send and revieve; connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); @@ -279,7 +314,7 @@ namespace ZP_Cluster{ nHashContains = m_hash_sock2node.contains(clientHandle); if (false==nHashContains) { - zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); + zp_ClusterNode * pnode = m_factory(this,clientHandle,0); //using queued connection of send and revieve; connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h index 676c10e..8141a12 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h @@ -9,7 +9,7 @@ #include "../network/zp_net_threadpool.h" #include "../pipeline/zp_pipeline.h" #include "../pipeline/zp_pltaskbase.h" - +#include namespace ZP_Cluster{ class zp_ClusterNode; //!this class enable server processes can @@ -19,6 +19,19 @@ namespace ZP_Cluster{ Q_OBJECT public: explicit zp_ClusterTerm(const QString & name,QObject *parent = 0); + + + /** + * The factory enables user-defined sub-classes inherits from zp_ClusterNode + * Using SetNodeFactory , set your own allocate method. + */ + void SetNodeFactory(std::function< + zp_ClusterNode * ( + zp_ClusterTerm * /*pTerm*/, + QObject * /*psock*/, + QObject * /*parent*/)> + ); + //cluster status ZPNetwork::zp_net_Engine * netEng(); ZPTaskEngine::zp_pipeline * taskEng(); @@ -37,6 +50,16 @@ namespace ZP_Cluster{ void BroadcastServers(); void SendHeartBeatings(); protected: + std::function m_factory; + zp_ClusterNode * default_factory( + zp_ClusterTerm * /*pTerm*/, + QObject * /*psock*/, + QObject * /*parent*/); + protected: + int m_nHeartBeatingTime; QString m_strTermName;//the Terminal's name QHostAddress m_addrPublish; //The publish address for other terms to connect to diff --git a/ZoomPipeline_FuncSvr/pipeline/zp_pipeline.h b/ZoomPipeline_FuncSvr/pipeline/zp_pipeline.h index 4044cd6..7c935e3 100644 --- a/ZoomPipeline_FuncSvr/pipeline/zp_pipeline.h +++ b/ZoomPipeline_FuncSvr/pipeline/zp_pipeline.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include "zp_plworkingthread.h" #include "zp_pltaskbase.h" diff --git a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp index d2b2cb4..639034f 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp @@ -1,7 +1,6 @@ #include "st_client_table.h" #include "st_clientnode_applayer.h" #include -#include namespace SmartLink{ st_client_table::st_client_table( ZPNetwork::zp_net_Engine * NetEngine, @@ -26,6 +25,7 @@ namespace SmartLink{ connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrDisconnected,this,&st_client_table::on_evt_NewSvrDisconnected,Qt::QueuedConnection); connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_recieved,this,&st_client_table::on_evt_RemoteData_recieved,Qt::QueuedConnection); connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_transferred,this,&st_client_table::on_evt_RemoteData_transferred,Qt::QueuedConnection); + Reg_st_cross_svr_node(m_pCluster); } @@ -282,11 +282,7 @@ namespace SmartLink{ //some data arrival void st_client_table::on_evt_RemoteData_recieved(const QString & svrHandle,const QByteArray & array ) { - const char * ptr = array.constData(); - QString str; - for (int i=0;i #include #include +#include #include "../network/zp_net_threadpool.h" #include "../pipeline/zp_pipeline.h" #include "./st_message.h" #include "../database/databaseresource.h" #include "../cluster/zp_clusterterm.h" +#include "./st_cross_svr_node.h" namespace SmartLink{ class st_clientNode_baseTrans; class st_client_table : public QObject @@ -67,6 +69,10 @@ namespace SmartLink{ QString m_strDBName_event; QString m_largeFileFolder; + //cluster Nodes Map + std::hash_map m_hash_remoteClient2SvrName; + + signals: void evt_Message (QObject * psource,const QString &); diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_msg.h b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_msg.h new file mode 100644 index 0000000..f36601c --- /dev/null +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_msg.h @@ -0,0 +1,44 @@ +#ifndef ST_CROSS_SVR_MSG_H +#define ST_CROSS_SVR_MSG_H + +namespace SmartLink{ + +#pragma pack (push,1) + +#if defined(__GNUC__) +#include + typedef struct tag_smartlink_crosssvr_msg{ + struct tag_msgHearder{ + __UINT16_TYPE__ Mark; //Always be "0x4567" + __UINT16_TYPE__ version; //Structure Version + __UINT8_TYPE__ mesageType; + __UINT32_TYPE__ messageLen; + } header; + union union_payload{ + __UINT8_TYPE__ data[1]; + __UINT32_TYPE__ uuids[1]; + } payload; + } STCROSSSVR_MSG; +#endif + +#if defined(_MSC_VER) + typedef struct tag_smartlink_crosssvr_msg{ + struct tag_msgHearder{ + unsigned __int16 Mark; //Always be 0x4567 + unsigned __int16 version; //Structure Version + unsigned __int8 mesageType; + unsigned __int32 messageLen; + } header; + union union_payload{ + unsigned __int8 data[1]; + unsigned __int32 uuids[1]; + } payload; + } STCROSSSVR_MSG; + +#endif + + + +#pragma pack(pop) +} +#endif diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp new file mode 100644 index 0000000..83b9f82 --- /dev/null +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp @@ -0,0 +1,26 @@ +#include "st_cross_svr_node.h" +#include "../cluster/zp_clusterterm.h" +namespace SmartLink{ + ZP_Cluster::zp_ClusterNode * st_cross_svr_node_factory( + ZP_Cluster::zp_ClusterTerm * pTerm, + QObject * psock, + QObject * parent) + { + return new st_cross_svr_node(pTerm,psock,parent); + } + + void Reg_st_cross_svr_node(ZP_Cluster::zp_ClusterTerm *pTerm ) + { + pTerm->SetNodeFactory(st_cross_svr_node_factory); + } + + st_cross_svr_node::st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) + :ZP_Cluster::zp_ClusterNode(pTerm,psock,parent) + { + } + bool st_cross_svr_node::deal_user_data(const QByteArray &array) + { + return ZP_Cluster::zp_ClusterNode::deal_user_data(array); + } + +} diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h new file mode 100644 index 0000000..daa0455 --- /dev/null +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h @@ -0,0 +1,23 @@ +#ifndef ST_CROSS_SVR_NODE_H +#define ST_CROSS_SVR_NODE_H +#include "../cluster/zp_clusternode.h" +namespace SmartLink{ + + ZP_Cluster::zp_ClusterNode * st_cross_svr_node_factory( + ZP_Cluster::zp_ClusterTerm * /*pTerm*/, + QObject * /*psock*/, + QObject * /*parent*/); + + void Reg_st_cross_svr_node(ZP_Cluster::zp_ClusterTerm *pTerm ); + + class st_cross_svr_node : public ZP_Cluster::zp_ClusterNode + { + Q_OBJECT + public: + st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent); + protected: + //!virtual functions, dealing with the user-defined operations. + virtual bool deal_user_data(const QByteArray &); + }; +} +#endif // ST_CROSS_SVR_NODE_H diff --git a/ZoomPipeline_FuncSvr/zpmainframe.cpp b/ZoomPipeline_FuncSvr/zpmainframe.cpp index cc59e67..cc73cff 100644 --- a/ZoomPipeline_FuncSvr/zpmainframe.cpp +++ b/ZoomPipeline_FuncSvr/zpmainframe.cpp @@ -20,7 +20,6 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) : { m_currentConffile = QCoreApplication::applicationFilePath()+".ini"; ui->setupUi(this); - //Create net engine m_netEngine = new zp_net_Engine (8192); connect (m_netEngine,&zp_net_Engine::evt_Message,this,&ZPMainFrame::on_evt_MessageNetwork); @@ -142,6 +141,7 @@ void ZPMainFrame::initUI() } ui->comboBox_db_type->setModel(pCombo); + m_pModelCluster= new QStandardItemModel(0,3,this); m_pModelCluster->setHeaderData(0,Qt::Horizontal,tr("Name")); m_pModelCluster->setHeaderData(1,Qt::Horizontal,tr("Address")); @@ -290,7 +290,8 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) //The Cluster Info QStringList lstCluster = m_pClusterTerm->SvrNames(); - m_pModelCluster->removeRows(0,m_pModelCluster->rowCount()); + if (m_pModelCluster->rowCount()>0) + m_pModelCluster->removeRows(0,m_pModelCluster->rowCount()); int nInserted = 0; foreach (QString strNodeName,lstCluster) { @@ -300,10 +301,6 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->SvrPort(strNodeName)); ++nInserted; } - - - - nInserted++; } else if (e->timerId()==m_nTimerCheck) { -- GitLab