提交 84c7fe4f 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

wrote cluster nodes method. not complete, and has still bugs in heart-beating.

上级 b866662b
......@@ -10,12 +10,10 @@ namespace ZP_Cluster{
struct tag_header{
__UINT16_TYPE__ Mark; //Always be "0x1234"
__UINT8_TYPE__ messagetype;
__INT32_TYPE__ data_length;
} hearder;
union uni_payload{
struct tag_plainData{
__UINT16_TYPE__ data_length;
__UINT8_TYPE__ data[1];
} plainData;
__UINT8_TYPE__ data[1];
} payload;
} CROSS_SVR_MSG;
......@@ -28,14 +26,10 @@ namespace ZP_Cluster{
struct tag_header{
unsigned __int16 Mark; //Always be 0x1234
unsigned __int8 messagetype;
__int32 data_length;
} hearder;
union uni_payload{
struct tag_plainData{
unsigned __int16 data_length;
unsigned __int8 data[1];
} plainData;
unsigned __int8 data[1];
} payload;
} CROSS_SVR_MSG;
......
#include "zp_clusternode.h"
#include "zp_clusterterm.h"
namespace ZP_Cluster{
zp_ClusterNode::zp_ClusterNode(QObject *parent) :
zp_ClusterNode::zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) :
ZPTaskEngine::zp_plTaskBase(parent)
,m_pTerm(pTerm)
,m_pSock(psock)
,bTermSet(false)
{
m_currentReadOffset = 0;
m_currentMessageSize = 0;
m_last_Report = QDateTime::currentDateTime();
}
int zp_ClusterNode::run()
{
if (bTermSet==true)
{
//qDebug()<<QString("%1(%2) Node Martked Deleted, return.\n").arg((unsigned int)this).arg(ref());
return 0;
}
int nCurrSz = -1;
int nMessage = m_nMessageBlockSize;
while (--nMessage>=0 && nCurrSz!=0 )
{
QByteArray block;
m_mutex_rawData.lock();
if (m_list_RawData.size())
block = *m_list_RawData.begin();
m_mutex_rawData.unlock();
if (block.isEmpty()==false && block.isNull()==false)
{
m_currentReadOffset = filter_message(block,m_currentReadOffset);
if (m_currentReadOffset >= block.size())
{
m_mutex_rawData.lock();
m_list_RawData.pop_front();
m_currentReadOffset = 0;
m_mutex_rawData.unlock();
}
}
else
{
m_mutex_rawData.lock();
//pop empty cabs
if (m_list_RawData.empty()==false)
m_list_RawData.pop_front();
m_mutex_rawData.unlock();
}
m_mutex_rawData.lock();
nCurrSz = m_list_RawData.size();
m_mutex_rawData.unlock();
}
m_mutex_rawData.lock();
nCurrSz = m_list_RawData.size();
m_mutex_rawData.unlock();
if (nCurrSz==0)
return 0;
return -1;
}
//push new binary data into queue
int zp_ClusterNode::push_new_data(const QByteArray & dtarray)
{
int res = 0;
m_mutex_rawData.lock();
m_list_RawData.push_back(dtarray);
res = m_list_RawData.size();
m_mutex_rawData.unlock();
m_last_Report = QDateTime::currentDateTime();
return res;
}
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
//!return bytes Used.
int zp_ClusterNode::filter_message(const QByteArray & block, int offset)
{
const int blocklen = block.length();
while (blocklen>offset)
{
const char * dataptr = block.constData();
//Recieve First 2 byte
while (m_currentMessageSize<2 && blocklen>offset )
{
m_currentBlock.push_back(dataptr[offset++]);
m_currentMessageSize++;
}
if (m_currentMessageSize < 2) //First 2 byte not complete
continue;
if (m_currentMessageSize==2)
{
const char * headerptr = m_currentBlock.constData();
memcpy((void *)&m_currentHeader,headerptr,2);
}
const char * ptrCurrData = m_currentBlock.constData();
if (m_currentHeader.Mark == 0x1234)
//Valid Message
{
while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset)
{
m_currentBlock.push_back(dataptr[offset++]);
m_currentMessageSize++;
}
if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed.
continue;
else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just completed.
{
const char * headerptr = m_currentBlock.constData();
memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header));
//continue reading if there is data left behind
if (block.length()>offset)
{
qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
-m_currentMessageSize ;
while (bitLeft>0 && blocklen>offset)
{
m_currentBlock.push_back(dataptr[offset++]);
m_currentMessageSize++;
bitLeft--;
}
//deal block, may be send data as soon as possible;
deal_current_message_block();
if (bitLeft>0)
continue;
//This Message is Over. Start a new one.
m_currentMessageSize = 0;
m_currentBlock = QByteArray();
continue;
}
}
else
{
if (block.length()>offset)
{
qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
-m_currentMessageSize ;
while (bitLeft>0 && blocklen>offset)
{
m_currentBlock.push_back(dataptr[offset++]);
m_currentMessageSize++;
bitLeft--;
}
//deal block, may be processed as soon as possible;
deal_current_message_block();
if (bitLeft>0)
continue;
//This Message is Over. Start a new one.
m_currentMessageSize = 0;
m_currentBlock = QByteArray();
continue;
}
} // end if there is more bytes to append
} //end deal trans message
else
{
emit evt_Message(this,tr("Client Send a unknown start Header %1 %2. Close client immediately.")
.arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1])));
m_currentMessageSize = 0;
m_currentBlock = QByteArray();
offset = blocklen;
emit evt_close_client(this->sock());
}
} // end while block len > offset
return offset;
}
//in Trans-Level, do nothing.
int zp_ClusterNode::deal_current_message_block()
{
return 0;
}
void zp_ClusterNode::CheckHeartBeating()
{
QDateTime dtm = QDateTime::currentDateTime();
qint64 usc = this->m_last_Report.secsTo(dtm);
int nThredHold = m_pTerm->heartBeatingThrd();
if (usc >= nThredHold)
{
emit evt_Message(this,tr("Client ") + QString("%1").arg((unsigned int)((quint64)this)) + tr(" is dead, kick out."));
emit evt_close_client(this->sock());
}
}
}
......@@ -2,8 +2,12 @@
#define ZP_CLUSTERNODE_H
#include <QObject>
#include <QHostAddress>
#include <QDateTime>
#include "../pipeline/zp_pltaskbase.h"
#include "cross_svr_messages.h"
namespace ZP_Cluster{
class zp_ClusterTerm;
/**
* @brief This class stand for a remote server.
* when local server establish a connection between itself and remote svr,
......@@ -13,12 +17,58 @@ namespace ZP_Cluster{
{
Q_OBJECT
public:
explicit zp_ClusterNode(QObject *parent = 0);
explicit zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent = 0);
int run();
signals:
bool bTermSet;
//!deal at most m_nMessageBlockSize messages per deal_message();
static const int m_nMessageBlockSize = 8;
//push new binary data into queue
int push_new_data(const QByteArray & dtarray);
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
//!return bytes Used.
int filter_message(const QByteArray &, int offset);
//!in Trans-Layer, it does nothing.
int deal_current_message_block();
QDateTime lastActiveTime(){ return m_last_Report;}
void CheckHeartBeating();
public:
QString termName(){return m_strTermName;}
QHostAddress addrPublish(){return m_addrPublish;}
int portPublish() {return m_nPortPublish;}
QObject * sock() { return m_pSock;}
protected:
zp_ClusterTerm * m_pTerm;
//Client socket handle of this connection
QObject * m_pSock;
//the data members.
QString m_strTermName; //the Terminal's name
QHostAddress m_addrPublish; //The publish address for other terms to connect to
int m_nPortPublish; //The publish port for other terms to connect to
public slots:
//Data Process
//The raw data queue and its mutex
QList<QByteArray> m_list_RawData;
QMutex m_mutex_rawData;
//The current Read Offset, from m_list_RawData's beginning
int m_currentReadOffset;
//Current Message Offset, according to m_currentHeader
int m_currentMessageSize;
//Current un-procssed message block.for large blocks,
//this array will be re-setted as soon as some part of data has been
//dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces
QByteArray m_currentBlock;
CROSS_SVR_MSG::tag_header m_currentHeader;
QDateTime m_last_Report;
signals:
void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray);
void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
void evt_close_client(QObject * objClient);
void evt_Message (QObject * psource,const QString &);
};
}
#endif // ZP_CLUSTERNODE_H
#include "zp_clusterterm.h"
#include "zp_clusternode.h"
#include <assert.h>
namespace ZP_Cluster{
zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) :
QObject(parent)
......@@ -14,6 +16,7 @@ namespace ZP_Cluster{
connect(m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_NewClientConnected, this,&zp_ClusterTerm::on_evt_NewClientConnected);
//connect(m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_ClientEncrypted, this,&zp_ClusterTerm::on_evt_ClientEncrypted);
m_nPortPublish = 0;
m_nHeartBeatingDeadThrd = 20;
}
void zp_ClusterTerm::StartListen(const QHostAddress &addr, int nPort)
......@@ -29,30 +32,182 @@ namespace ZP_Cluster{
{
return m_pClusterEng->canClose() && m_pClusterNet->CanExit();
}
//this event indicates new client connected.
void zp_ClusterTerm::on_evt_NewClientConnected(QObject * /*clientHandle*/)
bool zp_ClusterTerm::regisitNewServer(zp_ClusterNode * c)
{
//Before reg, termname must be recieved.
if (c->termName().length()<1)
return false;
m_hash_mutex.lock();
m_hash_Name2node[c->termName()] = c;
m_hash_mutex.unlock();
return true;
}
zp_ClusterNode * zp_ClusterTerm::SvrNodeFromName(const QString & uuid)
{
m_hash_mutex.lock();
if (m_hash_Name2node.contains(uuid))
{
m_hash_mutex.unlock();
return m_hash_Name2node[uuid];
}
m_hash_mutex.unlock();
return NULL;
}
//this event indicates new client encrypted.
void zp_ClusterTerm::on_evt_ClientEncrypted(QObject * /*clientHandle*/)
zp_ClusterNode * zp_ClusterTerm::SvrNodeFromSocket(QObject * sock)
{
m_hash_mutex.lock();
if (m_hash_sock2node.contains(sock))
{
m_hash_mutex.unlock();
return m_hash_sock2node[sock];
}
m_hash_mutex.unlock();
return NULL;
}
//this event indicates new client connected.
void zp_ClusterTerm::on_evt_NewClientConnected(QObject * clientHandle)
{
bool nHashContains = false;
zp_ClusterNode * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
m_hash_mutex.unlock();
assert(nHashContains!=0 && pClientNode !=0);
}
//this event indicates new client encrypted.
void zp_ClusterTerm::on_evt_ClientEncrypted(QObject * clientHandle)
{
bool nHashContains = false;
zp_ClusterNode * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
m_hash_mutex.unlock();
assert(nHashContains!=0 && pClientNode !=0);
}
//this event indicates a client disconnected.
void zp_ClusterTerm::on_evt_ClientDisconnected(QObject * /*clientHandle*/)
void zp_ClusterTerm::on_evt_ClientDisconnected(QObject * clientHandle)
{
bool nHashContains = false;
zp_ClusterNode * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (nHashContains)
pClientNode = m_hash_sock2node[clientHandle];
if (pClientNode)
{
m_hash_sock2node.remove(clientHandle);
if (pClientNode->termName().length()>0)
m_hash_Name2node.remove(pClientNode->termName());
pClientNode->bTermSet = true;
disconnect (pClientNode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient);
disconnect (pClientNode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData);
disconnect (pClientNode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients);
disconnect (pClientNode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message);
m_nodeToBeDel.push_back(pClientNode);
//qDebug()<<QString("%1(ref %2) Node Push in queue.\n").arg((unsigned int)pClientNode).arg(pClientNode->ref());
}
m_hash_mutex.unlock();
//Try to delete objects
QList <zp_ClusterNode *> toBedel;
foreach(zp_ClusterNode * pdelobj,m_nodeToBeDel)
{
if (pdelobj->ref() ==0)
toBedel.push_back(pdelobj);
else
{
//qDebug()<<QString("%1(ref %2) Waiting in del queue.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
}
}
foreach(zp_ClusterNode * pdelobj,toBedel)
{
m_nodeToBeDel.removeAll(pdelobj);
//qDebug()<<QString("%1(ref %2) deleting.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
pdelobj->deleteLater();
}
}
//some data arrival
void zp_ClusterTerm::on_evt_Data_recieved(QObject * /*clientHandle*/,const QByteArray & /*datablock*/ )
void zp_ClusterTerm::on_evt_Data_recieved(QObject * clientHandle,const QByteArray & datablock )
{
//Push Clients to nodes if it is not exist
bool nHashContains = false;
zp_ClusterNode * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
assert(nHashContains!=0 && pClientNode !=0);
int nblocks = pClientNode->push_new_data(datablock);
if (nblocks<=1)
m_pClusterEng->pushTask(pClientNode);
m_hash_mutex.unlock();
}
void zp_ClusterTerm::KickDeadClients()
{
m_hash_mutex.lock();
for (QMap<QObject *,zp_ClusterNode *>::iterator p =m_hash_sock2node.begin();
p!=m_hash_sock2node.end();p++)
{
p.value()->CheckHeartBeating();
}
m_hash_mutex.unlock();
}
//a block of data has been successfuly sent
void zp_ClusterTerm::on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/)
{
......
......@@ -3,11 +3,15 @@
#include <QObject>
#include <QHostAddress>
#include <QList>
#include <QMutex>
#include <QMap>
#include "../network/zp_net_threadpool.h"
#include "../pipeline/zp_pipeline.h"
#include "../pipeline/zp_pltaskbase.h"
namespace ZP_Cluster{
class zp_ClusterNode;
//!this class enable server processes can
//! communicate with each other.
class zp_ClusterTerm : public QObject
......@@ -27,12 +31,26 @@ namespace ZP_Cluster{
int publishPort(){return m_nPortPublish;}
QHostAddress setPublishAddr(QHostAddress addr){return m_addrPublish = addr;}
int setPublishPort(int port){return m_nPortPublish = port;}
int heartBeatingThrd() {return m_nHeartBeatingDeadThrd;}
void setHeartBeatingThrd(const int n){m_nHeartBeatingDeadThrd = n;}
protected:
QString m_strTermName;//the Terminal's name
QHostAddress m_addrPublish; //The publish address for other terms to connect to
int m_nPortPublish;//The publish port for other terms to connect to
ZPNetwork::zp_net_ThreadPool * m_pClusterNet;
ZPTaskEngine::zp_pipeline * m_pClusterEng;
int m_nHeartBeatingDeadThrd;
//Server Group Mapping
protected:
//This list hold dead nodes that still in task queue,avoiding crash
QList<zp_ClusterNode *> m_nodeToBeDel;
//important hashes. server name to socket, socket to server name
QMutex m_hash_mutex;
QMap<QString , zp_ClusterNode *> m_hash_Name2node;
QMap<QObject *,zp_ClusterNode *> m_hash_sock2node;
bool regisitNewServer(zp_ClusterNode *);
zp_ClusterNode * SvrNodeFromName(const QString &);
zp_ClusterNode * SvrNodeFromSocket(QObject *);
signals:
void evt_Message(QObject * ,const QString &);
......@@ -57,6 +75,7 @@ namespace ZP_Cluster{
//!as soon as connection established, more existing terms will be sent to this term,
//!an p2p connection will start
bool JoinCluster(const QHostAddress &addr, int nPort,bool bSSL=false);
void KickDeadClients();
};
}
......
......@@ -23,7 +23,7 @@ namespace SmartLink{
st_client_table::~st_client_table()
{
}
void st_client_table::KickDealClients()
void st_client_table::KickDeadClients()
{
m_hash_mutex.lock();
for (QMap<QObject *,st_clientNode_baseTrans *>::iterator p =m_hash_sock2node.begin();
......@@ -71,7 +71,7 @@ namespace SmartLink{
//this event indicates new client encrypted.
void st_client_table::on_evt_ClientEncrypted(QObject * clientHandle)
{
bool nHashContains = 0;
bool nHashContains = false;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
......@@ -98,7 +98,7 @@ namespace SmartLink{
//this event indicates new client connected.
void st_client_table::on_evt_NewClientConnected(QObject * clientHandle)
{
bool nHashContains = 0;
bool nHashContains = false;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
......@@ -174,7 +174,7 @@ namespace SmartLink{
void st_client_table::on_evt_Data_recieved(QObject * clientHandle,const QByteArray & datablock )
{
//Push Clients to nodes if it is not exist
bool nHashContains = 0;
bool nHashContains = false;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
......
......@@ -23,7 +23,7 @@ namespace SmartLink{
st_clientNode_baseTrans * clientNodeFromSocket(QObject *);
//Heart beating and healthy
void KickDealClients();
void KickDeadClients();
int heartBeatingThrd(){return m_nHeartBeatingDeadThrd;}
void setHeartBeatingThrd(int h) {m_nHeartBeatingDeadThrd = h;}
......
......@@ -29,10 +29,7 @@ namespace SmartLink{
bool uuidValid(){return m_bUUIDRecieved;}
bool bTermSet;
QDateTime lastActiveTime()
{
return m_last_Report;
}
QDateTime lastActiveTime(){ return m_last_Report;}
qint32 bytesLeft()
{
return m_currentHeader.data_length + sizeof(SMARTLINK_MSG) - 1
......
......@@ -84,7 +84,10 @@ ZPMainFrame::~ZPMainFrame()
break;
}
}
m_netEngine->deleteLater();
m_pDatabases->deleteLater();
m_taskEngine->deleteLater();
m_pClusterTerm->deleteLater();
delete ui;
}
......@@ -282,8 +285,9 @@ void ZPMainFrame::timerEvent(QTimerEvent * e)
{
killTimer(m_nTimerCheck);
m_nTimerCheck = -1;
m_clientTable->KickDealClients();
m_nTimerCheck = startTimer(10000);
m_clientTable->KickDeadClients();
m_pClusterTerm->KickDeadClients();
m_nTimerCheck = startTimer(5000);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册