提交 38cf314b 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Add Cluster App-Layer function interface. need further approachs.

上级 99fb36cb
......@@ -222,8 +222,11 @@ namespace ZP_Cluster{
emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName);
emit evt_close_client(this->sock());
}
//else
//m_pTerm->BroadcastServers();
else
{
emit evt_NewSvrConnected(this->termName());
m_pTerm->BroadcastServers();
}
}
else
{
......@@ -253,6 +256,20 @@ namespace ZP_Cluster{
}
}
break;
case 0x03:
if (m_currentMessageSize==m_currentBlock.size())
{
QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header),
m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header));
emit evt_RemoteData_recieved(this->termName(),arraySend);
}
else
{
QByteArray arraySend(m_currentBlock);
emit evt_RemoteData_recieved(this->termName(),arraySend);
}
m_currentBlock = QByteArray();
break;
default:
emit evt_Message(this,tr("Info:Unknown Msg Type got."));
emit evt_close_client(this->sock());
......
......@@ -72,6 +72,11 @@ namespace ZP_Cluster{
void evt_close_client(QObject * objClient);
void evt_connect_to(const QHostAddress & address , quint16 nPort,bool bSSLConn);
void evt_Message (QObject * psource,const QString &);
//Notify Messages
void evt_NewSvrConnected(const QString & /*svrHandle*/);
//some data arrival
void evt_RemoteData_recieved(const QString & /*svrHandle*/,const QByteArray & /*svrHandle*/ );
};
}
#endif // ZP_CLUSTERNODE_H
......@@ -165,6 +165,8 @@ namespace ZP_Cluster{
connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
......@@ -195,6 +197,8 @@ namespace ZP_Cluster{
connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
......@@ -213,6 +217,7 @@ namespace ZP_Cluster{
void zp_ClusterTerm::on_evt_ClientDisconnected(QObject * clientHandle)
{
bool nHashContains = false;
zp_ClusterNode * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
......@@ -220,17 +225,22 @@ namespace ZP_Cluster{
pClientNode = m_hash_sock2node[clientHandle];
if (pClientNode)
{
QString nameCurr = pClientNode->termName();
disconnect (pClientNode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient);
disconnect (pClientNode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData);
disconnect (pClientNode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients);
disconnect (pClientNode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message);
disconnect (pClientNode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo);
disconnect (pClientNode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected);
disconnect (pClientNode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved);
m_hash_sock2node.remove(clientHandle);
if (pClientNode->termName().length()>0)
m_hash_Name2node.remove(pClientNode->termName());
pClientNode->bTermSet = true;
m_nodeToBeDel.push_back(pClientNode);
if (nameCurr.length()>0)
emit evt_NewSvrDisconnected(nameCurr);
//qDebug()<<QString("%1(ref %2) Node Push in queue.\n").arg((unsigned int)pClientNode).arg(pClientNode->ref());
}
m_hash_mutex.unlock();
......@@ -271,6 +281,8 @@ namespace ZP_Cluster{
connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_NewSvrConnected,this,&zp_ClusterTerm::evt_NewSvrConnected,Qt::QueuedConnection);
connect (pnode,&zp_ClusterNode::evt_RemoteData_recieved,this,&zp_ClusterTerm::evt_RemoteData_recieved,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
......@@ -296,9 +308,9 @@ namespace ZP_Cluster{
m_hash_mutex.unlock();
}
//a block of data has been successfuly sent
void zp_ClusterTerm::on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/)
void zp_ClusterTerm::on_evt_Data_transferred(QObject * clientHandle, qint64 bytesent)
{
emit evt_RemoteData_transferred(clientHandle,bytesent);
}
void zp_ClusterTerm::BroadcastServers()
{
......@@ -363,6 +375,19 @@ namespace ZP_Cluster{
netEng()->SendDataToClient(m_hash_Name2node[key]->sock(),array);
}
m_hash_mutex.unlock();
BroadcastServers();
}
void zp_ClusterTerm::SendDataToRemoteServer(const QString & svrName,const QByteArray & SourceArray)
{
int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + SourceArray.size();
QByteArray array(nMsgLen,0);
CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
pMsg->hearder.Mark = 0x1234;
pMsg->hearder.data_length = SourceArray.size();
pMsg->hearder.messagetype = 0x03;
memcpy (pMsg->payload.data,SourceArray.constData(),SourceArray.size());
m_hash_mutex.lock();
if (m_hash_Name2node.contains(svrName))
netEng()->SendDataToClient(m_hash_Name2node[svrName]->sock(),array);
m_hash_mutex.unlock();
}
}
......@@ -61,6 +61,15 @@ namespace ZP_Cluster{
void evt_Message(QObject * ,const QString &);
//The socket error message
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
//this event indicates new svr successfully hand-shaked.
void evt_NewSvrConnected(const QString &/*svrHandle*/);
//this event indicates a client disconnected.
void evt_NewSvrDisconnected(const QString &/*svrHandle*/);
//some data arrival
void evt_RemoteData_recieved(const QString &/*svrHandle*/,const QByteArray & /*svrHandle*/ );
//a block of data has been successfuly sent
void evt_RemoteData_transferred(QObject * /*svrHandle*/,qint64 /*bytes sent*/);
protected slots:
//this event indicates new client connected.
void on_evt_NewClientConnected(QObject * /*clientHandle*/);
......@@ -82,6 +91,9 @@ namespace ZP_Cluster{
bool JoinCluster(const QHostAddress &addr, int nPort,bool bSSL=false);
void KickDeadClients();
//Send Data to Server
void SendDataToRemoteServer(const QString & svrName,const QByteArray &);
};
}
#endif // ZP_CLUSTERTERM_H
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册