提交 338f6eea 编写于 作者: H Haojun Liao

fix(query): fix invalid free in exchange.

上级 40feeffd
......@@ -2418,6 +2418,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pSourceDataInfo->pRsp != NULL);
qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows);
} else {
pSourceDataInfo->code = code;
}
......@@ -2466,6 +2469,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
......@@ -2643,7 +2648,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
int32_t completed = 0;
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
continue;
......@@ -2653,6 +2657,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
continue;
}
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
code = pDataInfo->code;
goto _error;
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
......@@ -2690,6 +2699,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
pLoadInfo->totalSize);
}
taosMemoryFreeClear(pDataInfo->pRsp);
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
......@@ -2699,7 +2710,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
}
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult;
}
......@@ -2719,26 +2729,9 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
} else {
ASSERT(0);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs();
// Asynchronously send all fetch requests to all sources.
for (int32_t i = 0; i < totalSources; ++i) {
int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
......@@ -2758,10 +2751,11 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
}
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
return TSDB_CODE_SUCCESS;
......@@ -2875,7 +2869,8 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteData(pOperator);
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
// return concurrentlyLoadRemoteData(pOperator);
}
}
......@@ -2927,7 +2922,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
}
pInfo->pResult = pBlock;
pInfo->seqLoadData = true;
pInfo->seqLoadData = false;
tsem_init(&pInfo->ready, 0, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册