zp_clusternode.cpp 11.6 KB
Newer Older
丁劲犇's avatar
丁劲犇 已提交
1
#include "zp_clusternode.h"
2
#include "zp_clusterterm.h"
3
#include <assert.h>
丁劲犇's avatar
丁劲犇 已提交
4
namespace ZP_Cluster{
5
	zp_ClusterNode::zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) :
丁劲犇's avatar
丁劲犇 已提交
6
		ZPTaskEngine::zp_plTaskBase(parent)
7 8 9
	  ,m_pTerm(pTerm)
	  ,m_pSock(psock)
	  ,bTermSet(false)
丁劲犇's avatar
丁劲犇 已提交
10
	{
11 12
		m_currentReadOffset = 0;
		m_currentMessageSize = 0;
13
		m_nPortLAN = m_nPortPub = 0;
14
		m_last_Report = QDateTime::currentDateTime();
15 16 17 18 19
		m_nRemoteClientNums = 0;
	}
	quint32 zp_ClusterNode::clientNums()
	{
		return m_nRemoteClientNums;
丁劲犇's avatar
丁劲犇 已提交
20
	}
21 22 23 24 25 26 27 28 29

	QDateTime zp_ClusterNode::lastActiveTime()
	{
		return m_last_Report;
	}
	QString zp_ClusterNode::termName()
	{
		return m_strTermName;
	}
30
	QHostAddress zp_ClusterNode::addrLAN()
31
	{
32
		return m_addrLAN;
33
	}
34
	int zp_ClusterNode::portLAN()
35
	{
36
		return m_nPortLAN;
37
	}
38 39 40 41 42 43 44 45 46 47

	QHostAddress zp_ClusterNode::addrPub()
	{
		return m_addrPub;
	}
	int zp_ClusterNode::portPub()
	{
		return m_nPortPub;
	}

48 49 50 51 52
	QObject * zp_ClusterNode::sock()
	{
		return m_pSock;
	}

丁劲犇's avatar
丁劲犇 已提交
53
	int zp_ClusterNode::run()
54 55 56
	{
		if (bTermSet==true)
		{
57
			//qDebug()<<QString("%1(%2) Node Martked Deleted, return.").arg((unsigned int)this).arg(ref());
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
			return 0;
		}
		int nCurrSz = -1;
		int nMessage = m_nMessageBlockSize;
		while (--nMessage>=0 && nCurrSz!=0  )
		{
			QByteArray block;
			m_mutex_rawData.lock();
			if (m_list_RawData.size())
				block =  *m_list_RawData.begin();
			m_mutex_rawData.unlock();
			if (block.isEmpty()==false && block.isNull()==false)
			{
				m_currentReadOffset = filter_message(block,m_currentReadOffset);
				if (m_currentReadOffset >= block.size())
				{
					m_mutex_rawData.lock();
75 76 77 78
					if (m_list_RawData.empty()==false)
						m_list_RawData.pop_front();
					else
						assert(false);
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
					m_currentReadOffset = 0;
					m_mutex_rawData.unlock();
				}
			}
			else
			{
				m_mutex_rawData.lock();
				//pop empty cabs
				if (m_list_RawData.empty()==false)
					m_list_RawData.pop_front();
				m_mutex_rawData.unlock();
			}
			m_mutex_rawData.lock();
			nCurrSz = m_list_RawData.size();
			m_mutex_rawData.unlock();
		}
		m_mutex_rawData.lock();
		nCurrSz = m_list_RawData.size();
		m_mutex_rawData.unlock();
		if (nCurrSz==0)
			return 0;
		return -1;
	}
	//push new binary data into queue
	int zp_ClusterNode::push_new_data(const  QByteArray &  dtarray)
	{
		int res = 0;
		m_mutex_rawData.lock();

		m_list_RawData.push_back(dtarray);
		res = m_list_RawData.size();
		m_mutex_rawData.unlock();
		m_last_Report = QDateTime::currentDateTime();
		return res;
	}

	//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
	//!return bytes Used.
117
	int zp_ClusterNode::filter_message(QByteArray  block, int offset)
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
	{
		const int blocklen = block.length();
		while (blocklen>offset)
		{
			const char * dataptr = block.constData();

			//Recieve First 2 byte
			while (m_currentMessageSize<2 && blocklen>offset )
			{
				m_currentBlock.push_back(dataptr[offset++]);
				m_currentMessageSize++;
			}
			if (m_currentMessageSize < 2) //First 2 byte not complete
				continue;

			if (m_currentMessageSize==2)
			{
				const char * headerptr = m_currentBlock.constData();
				memcpy((void *)&m_currentHeader,headerptr,2);
			}

139

140 141 142
			if (m_currentHeader.Mark == 0x1234)
				//Valid Message
			{
143 144
				//while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset)
				if (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset)
145
				{
146 147 148 149 150 151 152 153 154
					int nCpy = sizeof(CROSS_SVR_MSG::tag_header) - m_currentMessageSize;
					if (nCpy > blocklen - offset)
						nCpy = blocklen - offset;
					QByteArray arrCpy(dataptr+offset,nCpy);
					m_currentBlock.push_back(arrCpy);
					//m_currentBlock.push_back(dataptr[offset++]);
					//m_currentMessageSize++;
					offset += nCpy;
					m_currentMessageSize += nCpy;
155 156 157 158 159 160 161 162 163 164 165 166 167
				}
				if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed.
					continue;
				else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just  completed.
				{
					const char * headerptr = m_currentBlock.constData();
					memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header));

					//continue reading if there is data left behind
					if (block.length()>offset)
					{
						qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
								-m_currentMessageSize ;
168 169
						//while (bitLeft>0 && blocklen>offset)
						if (bitLeft>0 && blocklen>offset)
170
						{
171 172 173 174 175 176 177 178 179 180 181
							int nCpy = bitLeft;
							if (nCpy > blocklen - offset)
								nCpy = blocklen - offset;
							QByteArray arrCpy(dataptr+offset,nCpy);
							m_currentBlock.push_back(arrCpy);
							offset += nCpy;
							m_currentMessageSize += nCpy;
							bitLeft -= nCpy;
							//m_currentBlock.push_back(dataptr[offset++]);
							//m_currentMessageSize++;
							//bitLeft--;
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
						}
						//deal block, may be send data as soon as possible;
						deal_current_message_block();
						if (bitLeft>0)
							continue;
						//This Message is Over. Start a new one.
						m_currentMessageSize = 0;
						m_currentBlock = QByteArray();
						continue;
					}
				}
				else
				{
					if (block.length()>offset)
					{
						qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
								-m_currentMessageSize ;
199 200
						//while (bitLeft>0 && blocklen>offset)
						if (bitLeft>0 && blocklen>offset)
201
						{
202 203 204 205 206 207 208 209 210 211 212 213
							int nCpy = bitLeft;
							if (nCpy > blocklen - offset)
								nCpy = blocklen - offset;
							QByteArray arrCpy(dataptr+offset,nCpy);
							m_currentBlock.push_back(arrCpy);
							offset += nCpy;
							m_currentMessageSize += nCpy;
							bitLeft -= nCpy;

							//m_currentBlock.push_back(dataptr[offset++]);
							//m_currentMessageSize++;
							//bitLeft--;
214 215 216 217 218 219 220 221 222 223 224 225 226 227
						}
						//deal block, may be processed as soon as possible;
						deal_current_message_block();
						if (bitLeft>0)
							continue;
						//This Message is Over. Start a new one.
						m_currentMessageSize = 0;
						m_currentBlock = QByteArray();
						continue;
					}
				} // end if there is more bytes to append
			} //end deal trans message
			else
			{
228
				const char * ptrCurrData = m_currentBlock.constData();
229 230 231 232 233 234 235 236 237 238 239 240 241
				emit evt_Message(this,tr("Client Send a unknown start Header %1 %2. Close client immediately.")
								 .arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1])));
				m_currentMessageSize = 0;
				m_currentBlock = QByteArray();
				offset = blocklen;
				emit evt_close_client(this->sock());
			}
		} // end while block len > offset

		return offset;
	}
	//in Trans-Level, do nothing.
	int zp_ClusterNode::deal_current_message_block()
丁劲犇's avatar
丁劲犇 已提交
242
	{
243 244 245 246 247 248
		qint32 bytesLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
				-m_currentMessageSize ;
		const CROSS_SVR_MSG * pMsg =(const CROSS_SVR_MSG *) m_currentBlock.constData();
		switch(m_currentHeader.messagetype)
		{
		case 0x00://Heart Beating
249 250 251 252
			if (bytesLeft==0)
			{
				m_nRemoteClientNums = pMsg->payload.heartBeating.nClients;
			}
253 254
			break;
		case 0x01://basicInfo, when connection established, this message should be used
255 256 257 258 259
//			UnComment code below, will generate debug output.
//			if (m_currentBlock.length()>=64)
//				emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64));
//			else
//				emit evt_Message(this,"Debug:" + m_currentBlock.toHex());
丁劲犇's avatar
丁劲犇 已提交
260

261 262 263 264 265 266
			if (bytesLeft==0)
			{
				QString strName ((const char *)pMsg->payload.basicInfo.name);
				if (strName != m_pTerm->name())
				{
					this->m_strTermName = strName;
267 268 269 270
					m_nPortLAN = pMsg->payload.basicInfo.port_LAN;
					m_addrLAN = QHostAddress((const char *)pMsg->payload.basicInfo.Address_LAN);
					m_nPortPub = pMsg->payload.basicInfo.port_Pub;
					m_addrPub = QHostAddress((const char *)pMsg->payload.basicInfo.Address_Pub);
271 272
					if (false==m_pTerm->regisitNewServer(this))
					{
丁劲犇's avatar
丁劲犇 已提交
273
						this->m_strTermName.clear();
丁劲犇's avatar
丁劲犇 已提交
274
						emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName);
275 276
						emit evt_close_client(this->sock());
					}
277 278 279 280 281
					else
					{
						emit evt_NewSvrConnected(this->termName());
						m_pTerm->BroadcastServers();
					}
282 283 284 285 286 287 288 289 290
				}
				else
				{
					emit evt_Message(this,tr("Can not connect to it-self, Loopback connections is forbidden."));
					emit evt_close_client(this->sock());
				}
			}
			break;
		case 0x02: //Server - broadcast messages
291 292 293 294 295
//			UnComment code below, will generate debug output.
//			if (m_currentBlock.length()>=64)
//				emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64));
//			else
//				emit evt_Message(this,"Debug:" + m_currentBlock.toHex());
丁劲犇's avatar
丁劲犇 已提交
296

297 298 299 300 301 302 303 304
			if (bytesLeft==0)
			{
				int nSvrs = pMsg->hearder.data_length / sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast);
				for (int i=0;i<nSvrs;i++)
				{
					QString strName ((const char *)pMsg->payload.broadcastMsg[i].name);
					if (strName != m_pTerm->name() && m_pTerm->SvrNodeFromName(strName)==NULL)
					{
305 306
						QHostAddress addrToConnectTo((const char *)pMsg->payload.broadcastMsg[i].Address_LAN);
						quint16 PortToConnectTo = pMsg->payload.broadcastMsg[i].port_LAN;
丁劲犇's avatar
丁劲犇 已提交
307
						//because cross-connection is not good, we just want the low Addr:port connect to max Addr:Port.
308
						//Connect to New Servers
丁劲犇's avatar
丁劲犇 已提交
309 310 311 312
						if (strName > m_pTerm->name())
							emit evt_connect_to(addrToConnectTo,PortToConnectTo,false);
						else
							emit evt_Message(this,tr("Name %1 <= %2, omitted.").arg(strName).arg(m_pTerm->name()));
313 314 315 316
					}
				}
			}
			break;
317
		case 0x03:
318 319 320 321 322
			//			UnComment code below, will generate debug output.
//			if (m_currentBlock.length()>=64)
//				emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64));
//			else
//				emit evt_Message(this,"Debug:" + m_currentBlock.toHex());
丁劲犇's avatar
丁劲犇 已提交
323

324 325 326 327
			if (m_currentMessageSize==m_currentBlock.size())
			{
				QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header),
									  m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header));
328 329
				if (deal_user_data(arraySend)==true)
					m_currentBlock = QByteArray();
330 331 332
			}
			else
			{
333 334
				if (deal_user_data(m_currentBlock)==true)
					m_currentBlock = QByteArray();
335 336
			}
			break;
337
		default:
338 339
			emit evt_Message(this,tr("Info:Unknown Msg Type got."));
			emit evt_close_client(this->sock());
340 341 342
			break;
		};

丁劲犇's avatar
丁劲犇 已提交
343 344
		return 0;
	}
345

346
	bool zp_ClusterNode::deal_user_data(QByteArray  data)
347 348 349 350 351
	{
		emit evt_RemoteData_recieved(this->termName(),data);
		return true;
	}

352 353 354 355
	void zp_ClusterNode::CheckHeartBeating()
	{
		QDateTime dtm = QDateTime::currentDateTime();
		qint64 usc = this->m_last_Report.secsTo(dtm);
356
		int nThredHold = m_pTerm->heartBeatingThrdHold();
357 358 359 360 361 362
		if (usc >= nThredHold)
		{
			emit evt_Message(this,tr("Client ") + QString("%1").arg((unsigned int)((quint64)this)) + tr(" is dead, kick out."));
			emit evt_close_client(this->sock());
		}
	}
363 364
	void zp_ClusterNode::SendHelloPackage()
	{
丁劲犇's avatar
丁劲犇 已提交
365
		int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + sizeof (CROSS_SVR_MSG::uni_payload::tag_CSM_BasicInfo);
366 367 368 369 370
		QByteArray array(nMsgLen,0);
		CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
		pMsg->hearder.Mark = 0x1234;
		pMsg->hearder.data_length = sizeof (CROSS_SVR_MSG::uni_payload::tag_CSM_BasicInfo);
		pMsg->hearder.messagetype = 0x01;
371
		strncpy((char *)(pMsg->payload.basicInfo.name),
372 373
				m_pTerm->name().toStdString().c_str(),
				sizeof(pMsg->payload.basicInfo.name)-1);
374 375 376 377 378 379 380 381 382
		strncpy((char *)(pMsg->payload.basicInfo.Address_LAN),
				m_pTerm->LANAddr().toString().toStdString().c_str(),
				sizeof(pMsg->payload.basicInfo.Address_LAN)-1);

		pMsg->payload.basicInfo.port_LAN = m_pTerm->LANPort();

		strncpy((char *)(pMsg->payload.basicInfo.Address_Pub),
				m_pTerm->PublishAddr().toString().toStdString().c_str(),
				sizeof(pMsg->payload.basicInfo.Address_Pub)-1);
383

384
		pMsg->payload.basicInfo.port_Pub = m_pTerm->PublishPort();
385 386


387 388
		emit evt_SendDataToClient(sock(),array);
	}
丁劲犇's avatar
丁劲犇 已提交
389
}