提交 93dfebe0 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 5603f4a1
......@@ -2420,17 +2420,19 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
int32_t index = pSourceDataInfo->index;
if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pSourceDataInfo->pRsp != NULL);
qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows);
ASSERT(pRsp != NULL);
qDebug("fetch rsp received, index:%d, rows:%d", index, pRsp->numOfRows);
} else {
pSourceDataInfo->code = code;
}
......@@ -2628,6 +2630,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
while (1) {
int32_t completed = 0;
tsem_wait(&pExchangeInfo->ready);
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
......@@ -2725,7 +2729,6 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册