#include "zp_clusternode.h" #include "zp_clusterterm.h" namespace ZP_Cluster{ zp_ClusterNode::zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) : ZPTaskEngine::zp_plTaskBase(parent) ,m_pTerm(pTerm) ,m_pSock(psock) ,bTermSet(false) { m_currentReadOffset = 0; m_currentMessageSize = 0; m_last_Report = QDateTime::currentDateTime(); } int zp_ClusterNode::run() { if (bTermSet==true) { //qDebug()<=0 && nCurrSz!=0 ) { QByteArray block; m_mutex_rawData.lock(); if (m_list_RawData.size()) block = *m_list_RawData.begin(); m_mutex_rawData.unlock(); if (block.isEmpty()==false && block.isNull()==false) { m_currentReadOffset = filter_message(block,m_currentReadOffset); if (m_currentReadOffset >= block.size()) { m_mutex_rawData.lock(); m_list_RawData.pop_front(); m_currentReadOffset = 0; m_mutex_rawData.unlock(); } } else { m_mutex_rawData.lock(); //pop empty cabs if (m_list_RawData.empty()==false) m_list_RawData.pop_front(); m_mutex_rawData.unlock(); } m_mutex_rawData.lock(); nCurrSz = m_list_RawData.size(); m_mutex_rawData.unlock(); } m_mutex_rawData.lock(); nCurrSz = m_list_RawData.size(); m_mutex_rawData.unlock(); if (nCurrSz==0) return 0; return -1; } //push new binary data into queue int zp_ClusterNode::push_new_data(const QByteArray & dtarray) { int res = 0; m_mutex_rawData.lock(); m_list_RawData.push_back(dtarray); res = m_list_RawData.size(); m_mutex_rawData.unlock(); m_last_Report = QDateTime::currentDateTime(); return res; } //!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader //!return bytes Used. int zp_ClusterNode::filter_message(const QByteArray & block, int offset) { const int blocklen = block.length(); while (blocklen>offset) { const char * dataptr = block.constData(); //Recieve First 2 byte while (m_currentMessageSize<2 && blocklen>offset ) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; } if (m_currentMessageSize < 2) //First 2 byte not complete continue; if (m_currentMessageSize==2) { const char * headerptr = m_currentBlock.constData(); memcpy((void *)&m_currentHeader,headerptr,2); } const char * ptrCurrData = m_currentBlock.constData(); if (m_currentHeader.Mark == 0x1234) //Valid Message { while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; } if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed. continue; else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just completed. { const char * headerptr = m_currentBlock.constData(); memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header)); //continue reading if there is data left behind if (block.length()>offset) { qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; while (bitLeft>0 && blocklen>offset) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; bitLeft--; } //deal block, may be send data as soon as possible; deal_current_message_block(); if (bitLeft>0) continue; //This Message is Over. Start a new one. m_currentMessageSize = 0; m_currentBlock = QByteArray(); continue; } } else { if (block.length()>offset) { qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; while (bitLeft>0 && blocklen>offset) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; bitLeft--; } //deal block, may be processed as soon as possible; deal_current_message_block(); if (bitLeft>0) continue; //This Message is Over. Start a new one. m_currentMessageSize = 0; m_currentBlock = QByteArray(); continue; } } // end if there is more bytes to append } //end deal trans message else { emit evt_Message(this,tr("Client Send a unknown start Header %1 %2. Close client immediately.") .arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1]))); m_currentMessageSize = 0; m_currentBlock = QByteArray(); offset = blocklen; emit evt_close_client(this->sock()); } } // end while block len > offset return offset; } //in Trans-Level, do nothing. int zp_ClusterNode::deal_current_message_block() { return 0; } void zp_ClusterNode::CheckHeartBeating() { QDateTime dtm = QDateTime::currentDateTime(); qint64 usc = this->m_last_Report.secsTo(dtm); int nThredHold = m_pTerm->heartBeatingThrd(); if (usc >= nThredHold) { emit evt_Message(this,tr("Client ") + QString("%1").arg((unsigned int)((quint64)this)) + tr(" is dead, kick out.")); emit evt_close_client(this->sock()); } } }