#include "zp_clusternode.h" #include "zp_clusterterm.h" namespace ZP_Cluster{ zp_ClusterNode::zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) : ZPTaskEngine::zp_plTaskBase(parent) ,m_pTerm(pTerm) ,m_pSock(psock) ,bTermSet(false) { m_currentReadOffset = 0; m_currentMessageSize = 0; m_nPortPublish = 0; m_last_Report = QDateTime::currentDateTime(); } QDateTime zp_ClusterNode::lastActiveTime() { return m_last_Report; } QString zp_ClusterNode::termName() { return m_strTermName; } QHostAddress zp_ClusterNode::addrPublish() { return m_addrPublish; } int zp_ClusterNode::portPublish() { return m_nPortPublish; } QObject * zp_ClusterNode::sock() { return m_pSock; } int zp_ClusterNode::run() { if (bTermSet==true) { //qDebug()<=0 && nCurrSz!=0 ) { QByteArray block; m_mutex_rawData.lock(); if (m_list_RawData.size()) block = *m_list_RawData.begin(); m_mutex_rawData.unlock(); if (block.isEmpty()==false && block.isNull()==false) { m_currentReadOffset = filter_message(block,m_currentReadOffset); if (m_currentReadOffset >= block.size()) { m_mutex_rawData.lock(); m_list_RawData.pop_front(); m_currentReadOffset = 0; m_mutex_rawData.unlock(); } } else { m_mutex_rawData.lock(); //pop empty cabs if (m_list_RawData.empty()==false) m_list_RawData.pop_front(); m_mutex_rawData.unlock(); } m_mutex_rawData.lock(); nCurrSz = m_list_RawData.size(); m_mutex_rawData.unlock(); } m_mutex_rawData.lock(); nCurrSz = m_list_RawData.size(); m_mutex_rawData.unlock(); if (nCurrSz==0) return 0; return -1; } //push new binary data into queue int zp_ClusterNode::push_new_data(const QByteArray & dtarray) { int res = 0; m_mutex_rawData.lock(); m_list_RawData.push_back(dtarray); res = m_list_RawData.size(); m_mutex_rawData.unlock(); m_last_Report = QDateTime::currentDateTime(); return res; } //!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader //!return bytes Used. int zp_ClusterNode::filter_message(const QByteArray & block, int offset) { const int blocklen = block.length(); while (blocklen>offset) { const char * dataptr = block.constData(); //Recieve First 2 byte while (m_currentMessageSize<2 && blocklen>offset ) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; } if (m_currentMessageSize < 2) //First 2 byte not complete continue; if (m_currentMessageSize==2) { const char * headerptr = m_currentBlock.constData(); memcpy((void *)&m_currentHeader,headerptr,2); } const char * ptrCurrData = m_currentBlock.constData(); if (m_currentHeader.Mark == 0x1234) //Valid Message { while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; } if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed. continue; else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just completed. { const char * headerptr = m_currentBlock.constData(); memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header)); //continue reading if there is data left behind if (block.length()>offset) { qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; while (bitLeft>0 && blocklen>offset) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; bitLeft--; } //deal block, may be send data as soon as possible; deal_current_message_block(); if (bitLeft>0) continue; //This Message is Over. Start a new one. m_currentMessageSize = 0; m_currentBlock = QByteArray(); continue; } } else { if (block.length()>offset) { qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; while (bitLeft>0 && blocklen>offset) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; bitLeft--; } //deal block, may be processed as soon as possible; deal_current_message_block(); if (bitLeft>0) continue; //This Message is Over. Start a new one. m_currentMessageSize = 0; m_currentBlock = QByteArray(); continue; } } // end if there is more bytes to append } //end deal trans message else { emit evt_Message(this,tr("Client Send a unknown start Header %1 %2. Close client immediately.") .arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1]))); m_currentMessageSize = 0; m_currentBlock = QByteArray(); offset = blocklen; emit evt_close_client(this->sock()); } } // end while block len > offset return offset; } //in Trans-Level, do nothing. int zp_ClusterNode::deal_current_message_block() { qint32 bytesLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; const CROSS_SVR_MSG * pMsg =(const CROSS_SVR_MSG *) m_currentBlock.constData(); switch(m_currentHeader.messagetype) { case 0x00://Heart Beating break; case 0x01://basicInfo, when connection established, this message should be used if (m_currentBlock.length()>=64) emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64)); else emit evt_Message(this,"Debug:" + m_currentBlock.toHex()); if (bytesLeft==0) { QString strName ((const char *)pMsg->payload.basicInfo.name); if (strName != m_pTerm->name()) { this->m_strTermName = strName; m_nPortPublish = pMsg->payload.basicInfo.port; m_addrPublish = QHostAddress((const char *)pMsg->payload.basicInfo.Address); if (false==m_pTerm->regisitNewServer(this)) { this->m_strTermName.clear(); emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName); emit evt_close_client(this->sock()); } else { emit evt_NewSvrConnected(this->termName()); m_pTerm->BroadcastServers(); } } else { emit evt_Message(this,tr("Can not connect to it-self, Loopback connections is forbidden.")); emit evt_close_client(this->sock()); } } break; case 0x02: //Server - broadcast messages if (m_currentBlock.length()>=64) emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64)); else emit evt_Message(this,"Debug:" + m_currentBlock.toHex()); if (bytesLeft==0) { int nSvrs = pMsg->hearder.data_length / sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast); for (int i=0;ipayload.broadcastMsg[i].name); if (strName != m_pTerm->name() && m_pTerm->SvrNodeFromName(strName)==NULL) { QHostAddress addrToConnectTo((const char *)pMsg->payload.broadcastMsg[i].Address); quint16 PortToConnectTo = pMsg->payload.broadcastMsg[i].port; //because cross-connection is not good, we just want the low Addr:port connect to max Addr:Port. //Connect to New Servers if (strName > m_pTerm->name()) emit evt_connect_to(addrToConnectTo,PortToConnectTo,false); else emit evt_Message(this,tr("Name %1 <= %2, omitted.").arg(strName).arg(m_pTerm->name())); } } } break; case 0x03: if (m_currentBlock.length()>=64) emit evt_Message(this,"Debug:" + m_currentBlock.toHex().left(64) + "..." + m_currentBlock.toHex().right(64)); else emit evt_Message(this,"Debug:" + m_currentBlock.toHex()); if (m_currentMessageSize==m_currentBlock.size()) { QByteArray arraySend ((const char *)(pMsg) + sizeof(CROSS_SVR_MSG::tag_header), m_currentMessageSize - sizeof(CROSS_SVR_MSG::tag_header)); if (deal_user_data(arraySend)==true) m_currentBlock = QByteArray(); } else { if (deal_user_data(m_currentBlock)==true) m_currentBlock = QByteArray(); } break; default: emit evt_Message(this,tr("Info:Unknown Msg Type got.")); emit evt_close_client(this->sock()); break; }; return 0; } bool zp_ClusterNode::deal_user_data(const QByteArray & data) { emit evt_RemoteData_recieved(this->termName(),data); return true; } void zp_ClusterNode::CheckHeartBeating() { QDateTime dtm = QDateTime::currentDateTime(); qint64 usc = this->m_last_Report.secsTo(dtm); int nThredHold = m_pTerm->heartBeatingThrdHold(); if (usc >= nThredHold) { emit evt_Message(this,tr("Client ") + QString("%1").arg((unsigned int)((quint64)this)) + tr(" is dead, kick out.")); emit evt_close_client(this->sock()); } } void zp_ClusterNode::SendHelloPackage() { int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + sizeof (CROSS_SVR_MSG::uni_payload::tag_CSM_BasicInfo); QByteArray array(nMsgLen,0); CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data(); pMsg->hearder.Mark = 0x1234; pMsg->hearder.data_length = sizeof (CROSS_SVR_MSG::uni_payload::tag_CSM_BasicInfo); pMsg->hearder.messagetype = 0x01; strncpy((char *)(pMsg->payload.basicInfo.name), m_pTerm->name().toStdString().c_str(), sizeof(pMsg->payload.basicInfo.name)-1); strncpy((char *)(pMsg->payload.basicInfo.Address), m_pTerm->publishAddr().toString().toStdString().c_str(), sizeof(pMsg->payload.basicInfo.Address)-1); pMsg->payload.basicInfo.port = m_pTerm->publishPort(); emit evt_SendDataToClient(sock(),array); } }