zp_clusterterm.cpp 10.0 KB
Newer Older
1
#include "zp_clusterterm.h"
2 3
#include "zp_clusternode.h"
#include <assert.h>
4
namespace ZP_Cluster{
丁劲犇's avatar
丁劲犇 已提交
5 6 7 8 9
	zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) :
		QObject(parent)
	  ,m_strTermName(name)
	{
		m_pClusterEng = new ZPTaskEngine::zp_pipeline(this);
10 11 12 13 14 15 16
		m_pClusterNet = new ZPNetwork::zp_net_Engine(8192,this);
		connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_Message, this,&zp_ClusterTerm::evt_Message);
		connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_SocketError, this,&zp_ClusterTerm::evt_SocketError);
		connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_Data_recieved, this,&zp_ClusterTerm::on_evt_Data_recieved);
		connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_Data_transferred, this,&zp_ClusterTerm::on_evt_Data_transferred);
		connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_ClientDisconnected, this,&zp_ClusterTerm::on_evt_ClientDisconnected);
		connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_NewClientConnected, this,&zp_ClusterTerm::on_evt_NewClientConnected);
17
		//connect(m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted, this,&zp_ClusterTerm::on_evt_ClientEncrypted);
丁劲犇's avatar
丁劲犇 已提交
18
		m_nPortPublish = 0;
丁劲犇's avatar
丁劲犇 已提交
19
		m_nHeartBeatingTime = 20;
丁劲犇's avatar
丁劲犇 已提交
20

丁劲犇's avatar
丁劲犇 已提交
21 22 23 24
	}
	void zp_ClusterTerm::StartListen(const QHostAddress &addr, int nPort)
	{
		m_pClusterNet->AddListeningAddress(m_strTermName,addr,nPort,false);
25

丁劲犇's avatar
丁劲犇 已提交
26
	}
丁劲犇's avatar
丁劲犇 已提交
27
	bool zp_ClusterTerm::JoinCluster(const QHostAddress &addr, int nPort,bool bSSL)
丁劲犇's avatar
丁劲犇 已提交
28
	{
丁劲犇's avatar
丁劲犇 已提交
29
		return m_pClusterNet->connectTo(addr,nPort,bSSL);
丁劲犇's avatar
丁劲犇 已提交
30 31 32 33 34
	}
	bool zp_ClusterTerm::canExit()
	{
		return m_pClusterEng->canClose() && m_pClusterNet->CanExit();
	}
35 36 37 38 39 40 41

	bool zp_ClusterTerm::regisitNewServer(zp_ClusterNode * c)
	{
		//Before reg, termname must be recieved.
		if (c->termName().length()<1)
			return false;
		m_hash_mutex.lock();
42 43 44 45 46
		if (m_hash_Name2node.contains(c->termName())==true)
		{
			m_hash_mutex.unlock();
			return false;
		}
47 48 49 50 51 52
		m_hash_Name2node[c->termName()] = c;
		m_hash_mutex.unlock();
		return true;
	}

	zp_ClusterNode * zp_ClusterTerm::SvrNodeFromName(const QString & uuid)
53
	{
54 55 56 57 58 59 60
		m_hash_mutex.lock();
		if (m_hash_Name2node.contains(uuid))
		{
			m_hash_mutex.unlock();
			return m_hash_Name2node[uuid];
		}
		m_hash_mutex.unlock();
61

62
		return NULL;
63 64
	}

65
	zp_ClusterNode * zp_ClusterTerm::SvrNodeFromSocket(QObject * sock)
66
	{
67 68 69 70 71 72 73 74 75
		m_hash_mutex.lock();
		if (m_hash_sock2node.contains(sock))
		{
			m_hash_mutex.unlock();
			return m_hash_sock2node[sock];
		}
		m_hash_mutex.unlock();
		return NULL;
	}
76

77 78 79 80 81 82 83 84 85 86 87
	//this event indicates new client connected.
	void  zp_ClusterTerm::on_evt_NewClientConnected(QObject * clientHandle)
	{
		bool nHashContains = false;
		zp_ClusterNode * pClientNode = 0;
		m_hash_mutex.lock();
		nHashContains = m_hash_sock2node.contains(clientHandle);
		if (false==nHashContains)
		{
			zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0);
			//using queued connection of send and revieve;
88 89 90
			connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
			connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
			connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
91
			connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
92
			connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection);
93 94 95 96 97 98 99 100 101 102
			m_hash_sock2node[clientHandle] = pnode;
			nHashContains = true;
			pClientNode = pnode;
		}
		else
		{
			pClientNode =  m_hash_sock2node[clientHandle];
		}
		m_hash_mutex.unlock();
		assert(nHashContains!=0 && pClientNode !=0);
103 104
		//Send Hello Package
		pClientNode->SendHelloPackage();
105 106 107 108 109 110 111 112 113 114 115 116 117
	}

	//this event indicates new client encrypted.
	void  zp_ClusterTerm::on_evt_ClientEncrypted(QObject * clientHandle)
	{
		bool nHashContains = false;
		zp_ClusterNode * pClientNode = 0;
		m_hash_mutex.lock();
		nHashContains = m_hash_sock2node.contains(clientHandle);
		if (false==nHashContains)
		{
			zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0);
			//using queued connection of send and revieve;
118 119 120
			connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
			connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
			connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
121
			connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
122
			connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection);
123 124 125 126 127 128 129 130 131 132
			m_hash_sock2node[clientHandle] = pnode;
			nHashContains = true;
			pClientNode = pnode;
		}
		else
		{
			pClientNode =  m_hash_sock2node[clientHandle];
		}
		m_hash_mutex.unlock();
		assert(nHashContains!=0 && pClientNode !=0);
133 134
		//Send Hello Package
		pClientNode->SendHelloPackage();
135 136 137
	}

	//this event indicates a client disconnected.
138
	void  zp_ClusterTerm::on_evt_ClientDisconnected(QObject * clientHandle)
139
	{
140 141 142 143 144 145 146 147 148 149 150 151 152
		bool nHashContains  = false;
		zp_ClusterNode * pClientNode = 0;
		m_hash_mutex.lock();
		nHashContains = m_hash_sock2node.contains(clientHandle);
		if (nHashContains)
			pClientNode =  m_hash_sock2node[clientHandle];
		if (pClientNode)
		{
			m_hash_sock2node.remove(clientHandle);
			if (pClientNode->termName().length()>0)
				m_hash_Name2node.remove(pClientNode->termName());

			pClientNode->bTermSet = true;
153 154 155
			disconnect (pClientNode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient);
			disconnect (pClientNode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData);
			disconnect (pClientNode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients);
156
			disconnect (pClientNode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message);
157
			disconnect (pClientNode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo);
158 159 160 161
			m_nodeToBeDel.push_back(pClientNode);
			//qDebug()<<QString("%1(ref %2) Node Push in queue.\n").arg((unsigned int)pClientNode).arg(pClientNode->ref());
		}
		m_hash_mutex.unlock();
162

163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
		//Try to delete objects
		QList <zp_ClusterNode *> toBedel;
		foreach(zp_ClusterNode * pdelobj,m_nodeToBeDel)
		{
			if (pdelobj->ref() ==0)
				toBedel.push_back(pdelobj);
			else
			{
				//qDebug()<<QString("%1(ref %2) Waiting in del queue.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
			}
		}
		foreach(zp_ClusterNode * pdelobj,toBedel)
		{
			m_nodeToBeDel.removeAll(pdelobj);
			//qDebug()<<QString("%1(ref %2) deleting.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
			pdelobj->deleteLater();
		}
180 181
		//re-Broadcast Servers
		BroadcastServers();
182 183 184
	}

	//some data arrival
185
	void  zp_ClusterTerm::on_evt_Data_recieved(QObject *  clientHandle,const QByteArray & datablock )
186
	{
187 188 189 190 191 192 193 194 195
		//Push Clients to nodes if it is not exist
		bool nHashContains = false;
		zp_ClusterNode * pClientNode = 0;
		m_hash_mutex.lock();
		nHashContains = m_hash_sock2node.contains(clientHandle);
		if (false==nHashContains)
		{
			zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0);
			//using queued connection of send and revieve;
196 197 198
			connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection);
			connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_Engine::evt_BroadcastData,Qt::QueuedConnection);
			connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection);
199
			connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection);
200
			connect (pnode,&zp_ClusterNode::evt_connect_to,m_pClusterNet,&ZPNetwork::zp_net_Engine::connectTo,Qt::QueuedConnection);
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
			m_hash_sock2node[clientHandle] = pnode;
			nHashContains = true;
			pClientNode = pnode;
		}
		else
		{
			pClientNode =  m_hash_sock2node[clientHandle];
		}
		assert(nHashContains!=0 && pClientNode !=0);
		int nblocks =  pClientNode->push_new_data(datablock);
		if (nblocks<=1)
			m_pClusterEng->pushTask(pClientNode);
		m_hash_mutex.unlock();
	}
	void  zp_ClusterTerm::KickDeadClients()
	{
		m_hash_mutex.lock();
		for (QMap<QObject *,zp_ClusterNode *>::iterator p =m_hash_sock2node.begin();
			 p!=m_hash_sock2node.end();p++)
		{
			p.value()->CheckHeartBeating();
		}
		m_hash_mutex.unlock();
224 225 226 227 228 229
	}
	//a block of data has been successfuly sent
	void  zp_ClusterTerm::on_evt_Data_transferred(QObject *   /*clientHandle*/,qint64 /*bytes sent*/)
	{

	}
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
	void zp_ClusterTerm::BroadcastServers()
	{
		m_hash_mutex.lock();
		QList<QString> keys =  m_hash_Name2node.keys();
		int nsz = keys.size();
		//Msgs
		int nMsgLen = sizeof(CROSS_SVR_MSG::hearder) + sizeof (CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast) * nsz;
		QByteArray array(nMsgLen,0);
		CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
		pMsg->hearder.Mark = 0x1234;
		pMsg->hearder.data_length = sizeof (CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast) * nsz;
		pMsg->hearder.messagetype = 0x02;
		int ct = 0;
		foreach (QString key,keys)
		{
			zp_ClusterNode * pNode = m_hash_Name2node[key];
			strncpy((char *)pMsg->payload.broadcastMsg[ct].name,
					pNode->termName().toStdString().c_str(),
					sizeof(pMsg->payload.broadcastMsg[ct].name)-1);
			strncpy((char *)pMsg->payload.broadcastMsg[ct].Address,
					pNode->addrPublish().toString().toStdString().c_str(),
					sizeof(pMsg->payload.broadcastMsg[ct].Address)-1);
			pMsg->payload.broadcastMsg[ct].port = pNode->portPublish();
			++ct;
		}
		m_hash_mutex.unlock();
		m_pClusterNet->BroadcastData(0,array);
	}

	void zp_ClusterTerm::SendHeartBeatings()
	{
		//Msgs
		int nMsgLen = sizeof(CROSS_SVR_MSG::hearder);
		QByteArray array(nMsgLen,0);
		CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
		pMsg->hearder.Mark = 0x1234;
		pMsg->hearder.data_length = 0;
		pMsg->hearder.messagetype = 0x00;
		m_pClusterNet->BroadcastData(0,array);
	}
270
}