提交 5ebe6d3c 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

use a call-back method to deal with cross-svr messages.

上级 960c6d7c
...@@ -28,7 +28,8 @@ SOURCES += main.cpp\ ...@@ -28,7 +28,8 @@ SOURCES += main.cpp\
smartlink/st_clientnode_applayer.cpp \ smartlink/st_clientnode_applayer.cpp \
cluster/zp_clusterterm.cpp \ cluster/zp_clusterterm.cpp \
dialogaddressinput.cpp \ dialogaddressinput.cpp \
cluster/zp_clusternode.cpp cluster/zp_clusternode.cpp \
smartlink/st_cross_svr_node.cpp
HEADERS += zpmainframe.h \ HEADERS += zpmainframe.h \
network/zp_tcpserver.h \ network/zp_tcpserver.h \
...@@ -47,7 +48,9 @@ HEADERS += zpmainframe.h \ ...@@ -47,7 +48,9 @@ HEADERS += zpmainframe.h \
cluster/zp_clusterterm.h \ cluster/zp_clusterterm.h \
cluster/cross_svr_messages.h \ cluster/cross_svr_messages.h \
dialogaddressinput.h \ dialogaddressinput.h \
cluster/zp_clusternode.h cluster/zp_clusternode.h \
smartlink/st_cross_svr_msg.h \
smartlink/st_cross_svr_node.h
FORMS += zpmainframe.ui \ FORMS += zpmainframe.ui \
dialogaddressinput.ui dialogaddressinput.ui
......
...@@ -269,14 +269,14 @@ namespace ZP_Cluster{ ...@@ -269,14 +269,14 @@ namespace ZP_Cluster{
{ {
QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header), QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header),
m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header)); m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header));
emit evt_RemoteData_recieved(this->termName(),arraySend); if (deal_user_data(arraySend)==true)
m_currentBlock = QByteArray();
} }
else else
{ {
QByteArray arraySend(m_currentBlock); if (deal_user_data(m_currentBlock)==true)
emit evt_RemoteData_recieved(this->termName(),arraySend); m_currentBlock = QByteArray();
} }
m_currentBlock = QByteArray();
break; break;
default: default:
emit evt_Message(this,tr("Info:Unknown Msg Type got.")); emit evt_Message(this,tr("Info:Unknown Msg Type got."));
...@@ -286,6 +286,13 @@ namespace ZP_Cluster{ ...@@ -286,6 +286,13 @@ namespace ZP_Cluster{
return 0; return 0;
} }
bool zp_ClusterNode::deal_user_data(const QByteArray & data)
{
emit evt_RemoteData_recieved(this->termName(),data);
return true;
}
void zp_ClusterNode::CheckHeartBeating() void zp_ClusterNode::CheckHeartBeating()
{ {
QDateTime dtm = QDateTime::currentDateTime(); QDateTime dtm = QDateTime::currentDateTime();
......
...@@ -20,18 +20,22 @@ namespace ZP_Cluster{ ...@@ -20,18 +20,22 @@ namespace ZP_Cluster{
explicit zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent = 0); explicit zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent = 0);
int run(); int run();
bool bTermSet; bool bTermSet;
//!deal at most m_nMessageBlockSize messages per deal_message();
static const int m_nMessageBlockSize = 8;
//push new binary data into queue //push new binary data into queue
int push_new_data(const QByteArray & dtarray); int push_new_data(const QByteArray & dtarray);
void CheckHeartBeating();
protected:
//!deal at most m_nMessageBlockSize messages per deal_message();
static const int m_nMessageBlockSize = 8;
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader //!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
//!return bytes Used. //!return bytes Used.
int filter_message(const QByteArray &, int offset); int filter_message(const QByteArray &, int offset);
//!in Trans-Layer, it does nothing. //!in Trans-Layer, it does nothing.
int deal_current_message_block(); int deal_current_message_block();
//!virtual functions, dealing with the user-defined operations.
virtual bool deal_user_data(const QByteArray &);
QDateTime lastActiveTime(); QDateTime lastActiveTime();
void CheckHeartBeating();
public: public:
QString termName(); QString termName();
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "zp_clusternode.h" #include "zp_clusternode.h"
#include <assert.h> #include <assert.h>
namespace ZP_Cluster{ namespace ZP_Cluster{
using namespace std::placeholders;
zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) : zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) :
QObject(parent) QObject(parent)
,m_strTermName(name) ,m_strTermName(name)
...@@ -17,8 +18,42 @@ namespace ZP_Cluster{ ...@@ -17,8 +18,42 @@ namespace ZP_Cluster{
//connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted, this,&zp_ClusterTerm::on_evt_ClientEncrypted); //connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted, this,&zp_ClusterTerm::on_evt_ClientEncrypted);
m_nPortPublish = 0; m_nPortPublish = 0;
m_nHeartBeatingTime = 20; m_nHeartBeatingTime = 20;
m_factory = std::bind(&zp_ClusterTerm::default_factory,this,_1,_2,_3);
}
/**
* @brief The factory enables user-defined sub-classes inherits from zp_ClusterNode
* Using SetNodeFactory , set your own allocate method.
* @fn zp_ClusterTerm::default_factory the default factory function. just return zp_ClusterTerm *
* @param pTerm Term object
* @param psock Sock Object
* @param parent Parent
* @return zp_ClusterNode *
*/
zp_ClusterNode * zp_ClusterTerm::default_factory(
zp_ClusterTerm * pTerm,
QObject * psock,
QObject * parent)
{
return new zp_ClusterNode(pTerm,psock,parent);
}
/**
* @brief Using SetNodeFactory , set your own allocate method.
*
* @fn zp_ClusterTerm::SetNodeFactory
* @param fac The functor
*/
void zp_ClusterTerm::SetNodeFactory(std::function<
zp_ClusterNode * (
zp_ClusterTerm * /*pTerm*/,
QObject * /*psock*/,
QObject * /*parent*/)> fac
)
{
m_factory = fac;
} }
int zp_ClusterTerm::publishPort(){ int zp_ClusterTerm::publishPort(){
return m_nPortPublish; return m_nPortPublish;
...@@ -158,7 +193,7 @@ namespace ZP_Cluster{ ...@@ -158,7 +193,7 @@ namespace ZP_Cluster{
nHashContains = m_hash_sock2node.contains(clientHandle); nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains) if (false==nHashContains)
{ {
zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); zp_ClusterNode * pnode = m_factory(this,clientHandle,0);
//using queued connection of send and revieve; //using queued connection of send and revieve;
connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
...@@ -190,7 +225,7 @@ namespace ZP_Cluster{ ...@@ -190,7 +225,7 @@ namespace ZP_Cluster{
nHashContains = m_hash_sock2node.contains(clientHandle); nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains) if (false==nHashContains)
{ {
zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); zp_ClusterNode * pnode = m_factory(this,clientHandle,0);
//using queued connection of send and revieve; //using queued connection of send and revieve;
connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
...@@ -279,7 +314,7 @@ namespace ZP_Cluster{ ...@@ -279,7 +314,7 @@ namespace ZP_Cluster{
nHashContains = m_hash_sock2node.contains(clientHandle); nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains) if (false==nHashContains)
{ {
zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); zp_ClusterNode * pnode = m_factory(this,clientHandle,0);
//using queued connection of send and revieve; //using queued connection of send and revieve;
connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection); connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "../network/zp_net_threadpool.h" #include "../network/zp_net_threadpool.h"
#include "../pipeline/zp_pipeline.h" #include "../pipeline/zp_pipeline.h"
#include "../pipeline/zp_pltaskbase.h" #include "../pipeline/zp_pltaskbase.h"
#include <functional>
namespace ZP_Cluster{ namespace ZP_Cluster{
class zp_ClusterNode; class zp_ClusterNode;
//!this class enable server processes can //!this class enable server processes can
...@@ -19,6 +19,19 @@ namespace ZP_Cluster{ ...@@ -19,6 +19,19 @@ namespace ZP_Cluster{
Q_OBJECT Q_OBJECT
public: public:
explicit zp_ClusterTerm(const QString & name,QObject *parent = 0); explicit zp_ClusterTerm(const QString & name,QObject *parent = 0);
/**
* The factory enables user-defined sub-classes inherits from zp_ClusterNode
* Using SetNodeFactory , set your own allocate method.
*/
void SetNodeFactory(std::function<
zp_ClusterNode * (
zp_ClusterTerm * /*pTerm*/,
QObject * /*psock*/,
QObject * /*parent*/)>
);
//cluster status //cluster status
ZPNetwork::zp_net_Engine * netEng(); ZPNetwork::zp_net_Engine * netEng();
ZPTaskEngine::zp_pipeline * taskEng(); ZPTaskEngine::zp_pipeline * taskEng();
...@@ -37,6 +50,16 @@ namespace ZP_Cluster{ ...@@ -37,6 +50,16 @@ namespace ZP_Cluster{
void BroadcastServers(); void BroadcastServers();
void SendHeartBeatings(); void SendHeartBeatings();
protected: protected:
std::function<zp_ClusterNode * (
zp_ClusterTerm * /*pTerm*/,
QObject * /*psock*/,
QObject * /*parent*/)> m_factory;
zp_ClusterNode * default_factory(
zp_ClusterTerm * /*pTerm*/,
QObject * /*psock*/,
QObject * /*parent*/);
protected:
int m_nHeartBeatingTime; int m_nHeartBeatingTime;
QString m_strTermName;//the Terminal's name QString m_strTermName;//the Terminal's name
QHostAddress m_addrPublish; //The publish address for other terms to connect to QHostAddress m_addrPublish; //The publish address for other terms to connect to
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include <QVector> #include <QVector>
#include <QMutex> #include <QMutex>
#include <list> #include <list>
#include <functional>
#include <QThread> #include <QThread>
#include "zp_plworkingthread.h" #include "zp_plworkingthread.h"
#include "zp_pltaskbase.h" #include "zp_pltaskbase.h"
......
#include "st_client_table.h" #include "st_client_table.h"
#include "st_clientnode_applayer.h" #include "st_clientnode_applayer.h"
#include <assert.h> #include <assert.h>
#include <functional>
namespace SmartLink{ namespace SmartLink{
st_client_table::st_client_table( st_client_table::st_client_table(
ZPNetwork::zp_net_Engine * NetEngine, ZPNetwork::zp_net_Engine * NetEngine,
...@@ -26,6 +25,7 @@ namespace SmartLink{ ...@@ -26,6 +25,7 @@ namespace SmartLink{
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrDisconnected,this,&st_client_table::on_evt_NewSvrDisconnected,Qt::QueuedConnection); connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrDisconnected,this,&st_client_table::on_evt_NewSvrDisconnected,Qt::QueuedConnection);
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_recieved,this,&st_client_table::on_evt_RemoteData_recieved,Qt::QueuedConnection); connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_recieved,this,&st_client_table::on_evt_RemoteData_recieved,Qt::QueuedConnection);
connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_transferred,this,&st_client_table::on_evt_RemoteData_transferred,Qt::QueuedConnection); connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_transferred,this,&st_client_table::on_evt_RemoteData_transferred,Qt::QueuedConnection);
Reg_st_cross_svr_node(m_pCluster);
} }
...@@ -282,11 +282,7 @@ namespace SmartLink{ ...@@ -282,11 +282,7 @@ namespace SmartLink{
//some data arrival //some data arrival
void st_client_table::on_evt_RemoteData_recieved(const QString & svrHandle,const QByteArray & array ) void st_client_table::on_evt_RemoteData_recieved(const QString & svrHandle,const QByteArray & array )
{ {
const char * ptr = array.constData(); emit evt_Message(this,tr("Recieved %1 bytes Msg from ").arg(array.length()) + svrHandle);
QString str;
for (int i=0;i<array.size();i++)
str.push_back(QChar(ptr[i]));
emit evt_Message(this,"Recieved Svr Msg from " + svrHandle +":" +str);
} }
//a block of data has been successfuly sent //a block of data has been successfuly sent
......
...@@ -5,11 +5,13 @@ ...@@ -5,11 +5,13 @@
#include <QList> #include <QList>
#include <QMutex> #include <QMutex>
#include <QMap> #include <QMap>
#include <hash_map>
#include "../network/zp_net_threadpool.h" #include "../network/zp_net_threadpool.h"
#include "../pipeline/zp_pipeline.h" #include "../pipeline/zp_pipeline.h"
#include "./st_message.h" #include "./st_message.h"
#include "../database/databaseresource.h" #include "../database/databaseresource.h"
#include "../cluster/zp_clusterterm.h" #include "../cluster/zp_clusterterm.h"
#include "./st_cross_svr_node.h"
namespace SmartLink{ namespace SmartLink{
class st_clientNode_baseTrans; class st_clientNode_baseTrans;
class st_client_table : public QObject class st_client_table : public QObject
...@@ -67,6 +69,10 @@ namespace SmartLink{ ...@@ -67,6 +69,10 @@ namespace SmartLink{
QString m_strDBName_event; QString m_strDBName_event;
QString m_largeFileFolder; QString m_largeFileFolder;
//cluster Nodes Map
std::hash_map<quint32,QString> m_hash_remoteClient2SvrName;
signals: signals:
void evt_Message (QObject * psource,const QString &); void evt_Message (QObject * psource,const QString &);
......
#ifndef ST_CROSS_SVR_MSG_H
#define ST_CROSS_SVR_MSG_H
namespace SmartLink{
#pragma pack (push,1)
#if defined(__GNUC__)
#include <stdint.h>
typedef struct tag_smartlink_crosssvr_msg{
struct tag_msgHearder{
__UINT16_TYPE__ Mark; //Always be "0x4567"
__UINT16_TYPE__ version; //Structure Version
__UINT8_TYPE__ mesageType;
__UINT32_TYPE__ messageLen;
} header;
union union_payload{
__UINT8_TYPE__ data[1];
__UINT32_TYPE__ uuids[1];
} payload;
} STCROSSSVR_MSG;
#endif
#if defined(_MSC_VER)
typedef struct tag_smartlink_crosssvr_msg{
struct tag_msgHearder{
unsigned __int16 Mark; //Always be 0x4567
unsigned __int16 version; //Structure Version
unsigned __int8 mesageType;
unsigned __int32 messageLen;
} header;
union union_payload{
unsigned __int8 data[1];
unsigned __int32 uuids[1];
} payload;
} STCROSSSVR_MSG;
#endif
#pragma pack(pop)
}
#endif
#include "st_cross_svr_node.h"
#include "../cluster/zp_clusterterm.h"
namespace SmartLink{
ZP_Cluster::zp_ClusterNode * st_cross_svr_node_factory(
ZP_Cluster::zp_ClusterTerm * pTerm,
QObject * psock,
QObject * parent)
{
return new st_cross_svr_node(pTerm,psock,parent);
}
void Reg_st_cross_svr_node(ZP_Cluster::zp_ClusterTerm *pTerm )
{
pTerm->SetNodeFactory(st_cross_svr_node_factory);
}
st_cross_svr_node::st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent)
:ZP_Cluster::zp_ClusterNode(pTerm,psock,parent)
{
}
bool st_cross_svr_node::deal_user_data(const QByteArray &array)
{
return ZP_Cluster::zp_ClusterNode::deal_user_data(array);
}
}
#ifndef ST_CROSS_SVR_NODE_H
#define ST_CROSS_SVR_NODE_H
#include "../cluster/zp_clusternode.h"
namespace SmartLink{
ZP_Cluster::zp_ClusterNode * st_cross_svr_node_factory(
ZP_Cluster::zp_ClusterTerm * /*pTerm*/,
QObject * /*psock*/,
QObject * /*parent*/);
void Reg_st_cross_svr_node(ZP_Cluster::zp_ClusterTerm *pTerm );
class st_cross_svr_node : public ZP_Cluster::zp_ClusterNode
{
Q_OBJECT
public:
st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent);
protected:
//!virtual functions, dealing with the user-defined operations.
virtual bool deal_user_data(const QByteArray &);
};
}
#endif // ST_CROSS_SVR_NODE_H
...@@ -20,7 +20,6 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) : ...@@ -20,7 +20,6 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) :
{ {
m_currentConffile = QCoreApplication::applicationFilePath()+".ini"; m_currentConffile = QCoreApplication::applicationFilePath()+".ini";
ui->setupUi(this); ui->setupUi(this);
//Create net engine //Create net engine
m_netEngine = new zp_net_Engine (8192); m_netEngine = new zp_net_Engine (8192);
connect (m_netEngine,&zp_net_Engine::evt_Message,this,&ZPMainFrame::on_evt_MessageNetwork); connect (m_netEngine,&zp_net_Engine::evt_Message,this,&ZPMainFrame::on_evt_MessageNetwork);
...@@ -142,6 +141,7 @@ void ZPMainFrame::initUI() ...@@ -142,6 +141,7 @@ void ZPMainFrame::initUI()
} }
ui->comboBox_db_type->setModel(pCombo); ui->comboBox_db_type->setModel(pCombo);
m_pModelCluster= new QStandardItemModel(0,3,this); m_pModelCluster= new QStandardItemModel(0,3,this);
m_pModelCluster->setHeaderData(0,Qt::Horizontal,tr("Name")); m_pModelCluster->setHeaderData(0,Qt::Horizontal,tr("Name"));
m_pModelCluster->setHeaderData(1,Qt::Horizontal,tr("Address")); m_pModelCluster->setHeaderData(1,Qt::Horizontal,tr("Address"));
...@@ -290,7 +290,8 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) ...@@ -290,7 +290,8 @@ void ZPMainFrame::timerEvent(QTimerEvent * e)
//The Cluster Info //The Cluster Info
QStringList lstCluster = m_pClusterTerm->SvrNames(); QStringList lstCluster = m_pClusterTerm->SvrNames();
m_pModelCluster->removeRows(0,m_pModelCluster->rowCount()); if (m_pModelCluster->rowCount()>0)
m_pModelCluster->removeRows(0,m_pModelCluster->rowCount());
int nInserted = 0; int nInserted = 0;
foreach (QString strNodeName,lstCluster) foreach (QString strNodeName,lstCluster)
{ {
...@@ -300,10 +301,6 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) ...@@ -300,10 +301,6 @@ void ZPMainFrame::timerEvent(QTimerEvent * e)
m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->SvrPort(strNodeName)); m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->SvrPort(strNodeName));
++nInserted; ++nInserted;
} }
nInserted++;
} }
else if (e->timerId()==m_nTimerCheck) else if (e->timerId()==m_nTimerCheck)
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册