diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp index c8570d8af0a7467aee71a33f62fdea4ab647683e..831a61321351acb22b66d7baa8d9cbdfdfb588d2 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp @@ -57,10 +57,6 @@ namespace ZP_Cluster{ //qDebug()<1) - return -1; int nCurrSz = -1; int nMessage = m_nMessageBlockSize; while (--nMessage>=0 && nCurrSz!=0 ) @@ -144,10 +140,18 @@ namespace ZP_Cluster{ if (m_currentHeader.Mark == 0x1234) //Valid Message { - while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) + //while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) + if (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) { - m_currentBlock.push_back(dataptr[offset++]); - m_currentMessageSize++; + int nCpy = sizeof(CROSS_SVR_MSG::tag_header) - m_currentMessageSize; + if (nCpy > blocklen - offset) + nCpy = blocklen - offset; + QByteArray arrCpy(dataptr+offset,nCpy); + m_currentBlock.push_back(arrCpy); + //m_currentBlock.push_back(dataptr[offset++]); + //m_currentMessageSize++; + offset += nCpy; + m_currentMessageSize += nCpy; } if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed. continue; @@ -161,11 +165,20 @@ namespace ZP_Cluster{ { qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; - while (bitLeft>0 && blocklen>offset) + //while (bitLeft>0 && blocklen>offset) + if (bitLeft>0 && blocklen>offset) { - m_currentBlock.push_back(dataptr[offset++]); - m_currentMessageSize++; - bitLeft--; + int nCpy = bitLeft; + if (nCpy > blocklen - offset) + nCpy = blocklen - offset; + QByteArray arrCpy(dataptr+offset,nCpy); + m_currentBlock.push_back(arrCpy); + offset += nCpy; + m_currentMessageSize += nCpy; + bitLeft -= nCpy; + //m_currentBlock.push_back(dataptr[offset++]); + //m_currentMessageSize++; + //bitLeft--; } //deal block, may be send data as soon as possible; deal_current_message_block(); @@ -183,11 +196,21 @@ namespace ZP_Cluster{ { qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; - while (bitLeft>0 && blocklen>offset) + //while (bitLeft>0 && blocklen>offset) + if (bitLeft>0 && blocklen>offset) { - m_currentBlock.push_back(dataptr[offset++]); - m_currentMessageSize++; - bitLeft--; + int nCpy = bitLeft; + if (nCpy > blocklen - offset) + nCpy = blocklen - offset; + QByteArray arrCpy(dataptr+offset,nCpy); + m_currentBlock.push_back(arrCpy); + offset += nCpy; + m_currentMessageSize += nCpy; + bitLeft -= nCpy; + + //m_currentBlock.push_back(dataptr[offset++]); + //m_currentMessageSize++; + //bitLeft--; } //deal block, may be processed as soon as possible; deal_current_message_block(); diff --git a/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.cpp b/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.cpp index 19bacc8d0804559e410b8daaf59d5849521e1f0c..4bfb07557e3313f6a8d00231c20ecb24715e8623 100644 --- a/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.cpp +++ b/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.cpp @@ -25,4 +25,14 @@ namespace ZPTaskEngine{ QMutexLocker locker(&m_mutex_ref); return refCount; } + + bool zp_plTaskBase::LockRun() + { + return m_mutex_run.tryLock(); + } + + void zp_plTaskBase::UnlockRun() + { + return m_mutex_run.unlock(); + } } diff --git a/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.h b/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.h index 70e6bed77a78c6df8a0ebb3b0e866c999a953243..402dbc90fd605f58d2820732198542916480c26f 100644 --- a/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.h +++ b/ZoomPipeline_FuncSvr/pipeline/zp_pltaskbase.h @@ -27,9 +27,13 @@ namespace ZPTaskEngine{ int ref(); + bool LockRun(); + void UnlockRun(); + private: int refCount; QMutex m_mutex_ref; + QMutex m_mutex_run; signals: public slots: diff --git a/ZoomPipeline_FuncSvr/pipeline/zp_plworkingthread.cpp b/ZoomPipeline_FuncSvr/pipeline/zp_plworkingthread.cpp index a45dc22620389a3294f92d63256d11d124f7a430..8644a2373a22ca533ac46e3a0dca6350c532591b 100644 --- a/ZoomPipeline_FuncSvr/pipeline/zp_plworkingthread.cpp +++ b/ZoomPipeline_FuncSvr/pipeline/zp_plworkingthread.cpp @@ -54,11 +54,21 @@ namespace ZPTaskEngine{ if (bValid==true && ptr!=NULL) { m_bBusy = true; - int res = ptr->run(); - ptr->delRef(); - m_bBusy = false; - if (res!=0 ) + if (ptr->LockRun()==true) + { + int res = ptr->run(); + ptr->delRef(); + if (res!=0 ) + this->m_pipeline->pushTask(ptr,false); + ptr->UnlockRun(); + } + else + { + ptr->delRef(); this->m_pipeline->pushTask(ptr,false); + } + m_bBusy = false; + } emit taskFinished(this); diff --git a/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.cpp b/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.cpp index d4547016603dc43409bbd3196d9372c39420fe60..95c9d4fe6fec22e205de26f93ecbc2d0330731f8 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.cpp @@ -49,10 +49,6 @@ namespace ExampleServer{ //qDebug()<1) - return -1; int nCurrSz = -1; int nMessage = m_nMessageBlockSize; while (--nMessage>=0 && nCurrSz!=0 )