diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 3c27c8064a7fc63bed08d598092cb0e86e74df1a..cff0c50644beee63a6bc7d8b96b234407d8def19 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,7 +75,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { - qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); + int32_t v = 0; + sem_getvalue(&pExchangeInfo->ready, &v); + qDebug("prepare wait for ready, sem:(%d,%p), %p, %s", v, pExchangeInfo->ready.__align, pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { @@ -370,7 +372,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); - int32_t v = 0; + int32_t v = 0, v1 = 0; sem_getvalue(&pExchangeInfo->ready, &v); if (code == TSDB_CODE_SUCCESS) { @@ -384,12 +386,13 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows, v, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, v, pExchangeInfo->ready.__align, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, sem:%d, %p", pSourceDataInfo->taskId, index, tstrerror(code), v, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, error:%s, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, tstrerror(code), v, + pExchangeInfo->ready.__align, pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; @@ -429,9 +432,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return pTaskInfo->code; } - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, - pSource->execId, sourceIndex, totalSources); + pSource->execId, pExchangeInfo, sourceIndex, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->sId = htobe64(pSource->schedId);