diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 63067b9bd924fa08450976d66c9cfdd5a4863662..4d50c3ed765e01ad5877e88220a6bd6e5a94496b 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -620,36 +620,38 @@ TEST(testCase, create_topic_Test) { ////TEST(testCase, tmq_commit_TEST) { ////} // -//TEST(testCase, projection_query_tables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// for(int32_t i = 0; i < 100000; ++i) { -// char sql[512] = {0}; -// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); -// TAOS_RES* p = taos_query(pConn, sql); -// if (taos_errno(p) != 0) { -// printf("failed to insert data, reason:%s\n", taos_errstr(p)); -// } -// -// taos_free_result(p); -// } -// +TEST(testCase, projection_query_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tu using st1 tags(1) tu1 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + for(int32_t i = 0; i < 5000000; i += 4) { + char sql[512] = {0}; + sprintf(sql, "insert into tu values(now+%da, %d) (now+%da, %d) (now+%da, %d) (now+%da, %d)" + " tu1 values(now+%da, %d) (now+%da, %d) (now+%da, %d) (now+%da, %d)", i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, + i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } + + taos_free_result(p); + } + // pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { // printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); @@ -668,36 +670,36 @@ TEST(testCase, create_topic_Test) { // } // // taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, projection_query_stables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select ts from st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + taos_close(pConn); +} + +TEST(testCase, projection_query_stables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "select ts from st1"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} // //TEST(testCase, agg_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5f5abca2f78cbe75910a7bec437dc53b0a1988ca..02bacb15cacbe9c62b2bced0576b54843f623912 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -371,18 +371,31 @@ typedef struct STaskParam { struct SUdfInfo* pUdfInfo; } STaskParam; +enum { + DATA_NOT_READY = 0x1, + DATA_READY = 0x2, + DATA_EXHAUSTED = 0x3, +}; + +typedef struct SSourceDataInfo { + struct SExchangeInfo *pEx; + int32_t index; + SRetrieveTableRsp *pRsp; + uint64_t totalRows; + int32_t status; +} SSourceDataInfo; + typedef struct SExchangeInfo { SArray* pSources; + SArray* pSourceDataInfo; tsem_t ready; void* pTransporter; - SRetrieveTableRsp* pRsp; SSDataBlock* pResult; + bool seqLoadData; int32_t current; - uint64_t rowsOfCurrentSource; - - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SExchangeInfo; typedef struct STableScanInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c1d2207d606e3d91f65e4e9a488fe70d7cc7b0d7..7452edec04a826b7d280a7eadcab5e2f88f5bd1d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4918,14 +4918,16 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { } int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { - SExchangeInfo* pEx = (SExchangeInfo*) param; - pEx->pRsp = pMsg->pData; + SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param; + pSourceDataInfo->pRsp = pMsg->pData; - pEx->pRsp->numOfRows = htonl(pEx->pRsp->numOfRows); - pEx->pRsp->useconds = htobe64(pEx->pRsp->useconds); - pEx->pRsp->compLen = htonl(pEx->pRsp->compLen); + SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->compLen = htonl(pRsp->compLen); - tsem_post(&pEx->ready); + pSourceDataInfo->status = DATA_READY; + tsem_post(&pSourceDataInfo->pEx->ready); } static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { @@ -4955,115 +4957,234 @@ void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { destroySendMsgInfo(pSendInfo); } -static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { - SOperatorInfo *pOperator = (SOperatorInfo*) param; +static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInfo *pTaskInfo, int32_t sourceIndex) { + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SExchangeInfo *pExchangeInfo = pOperator->info; - SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + SResFetchReq* pMsg = calloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } - *newgroup = false; + SDownstreamSource *pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); + SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - if (pExchangeInfo->current >= totalSources) { - qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, - pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); - return NULL; + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); + + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + tfree(pMsg); + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; } - SResFetchReq* pMsg = NULL; - SMsgSendInfo* pMsgSendInfo = NULL; + pMsgSendInfo->param = pDataInfo; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgType = TDMT_VND_FETCH; + pMsgSendInfo->fp = loadRemoteDataCallback; - while(1) { - pMsg = calloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { // todo handle malloc error - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _error; + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo); + return TSDB_CODE_SUCCESS; +} + +static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) { + char* pData = pDataInfo->pRsp->data; + SRetrieveTableRsp* pRsp = pDataInfo->pRsp; + + for (int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); + if (tmp == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; } - SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); + size_t len = pRsp->numOfRows * pColInfoData->info.bytes; + memcpy(tmp, pData, len); - int64_t startTs = taosGetTimestampUs(); - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, pExchangeInfo->current, totalSources); + pColInfoData->pData = tmp; + pData += len; + } - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pRes->info.rows = pRsp->numOfRows; - // send the fetch remote task result reques - pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _error; - } + int64_t el = taosGetTimestampUs() - startTs; - pMsgSendInfo->param = pExchangeInfo; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = TDMT_VND_FETCH; - pMsgSendInfo->fp = loadRemoteDataCallback; + pExchangeInfo->totalRows += pRsp->numOfRows; + pExchangeInfo->totalSize += pRsp->compLen; + pDataInfo->totalRows += pRsp->numOfRows; - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo); - tsem_wait(&pExchangeInfo->ready); + pExchangeInfo->totalElapsed += el; - SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; - if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, - pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows); + return TSDB_CODE_SUCCESS; +} - pExchangeInfo->rowsOfCurrentSource = 0; - pExchangeInfo->current += 1; +static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) { + SExchangeInfo *pExchangeInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - if (pExchangeInfo->current >= totalSources) { - int64_t el = taosGetTimestampUs() - startTs; - pExchangeInfo->totalElapsed += el; + int64_t el = taosGetTimestampUs() - startTs; + pExchangeInfo->totalElapsed += el; - qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, - pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); - return NULL; - } else { + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, + pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); + + doSetOperatorCompleted(pOperator); + return NULL; +} + +static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SExchangeInfo *pExchangeInfo, SExecTaskInfo *pTaskInfo) { + int32_t code = 0; + int64_t startTs = taosGetTimestampUs(); + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + + while (1) { + int32_t completed = 0; + for (int32_t i = 0; i < totalSources; ++i) { + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); + + if (pDataInfo->status == DATA_EXHAUSTED) { + completed += 1; continue; } - } - SSDataBlock* pRes = pExchangeInfo->pResult; - char* pData = pRsp->data; + if (pDataInfo->status != DATA_READY) { + continue; + } + + SRetrieveTableRsp* pRsp = pDataInfo->pRsp; + SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, i); + + SSDataBlock* pRes = pExchangeInfo->pResult; - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); - if (tmp == NULL) { + if (pRsp->numOfRows == 0) { + qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, + pExchangeInfo->totalRows); + pDataInfo->status = DATA_EXHAUSTED; + completed += 1; + continue; + } + + code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs); + if (code != 0) { goto _error; } - size_t len = pRsp->numOfRows * pColInfoData->info.bytes; - memcpy(tmp, pData, len); + if (pRsp->completed == 1) { + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, + pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1, + totalSources); + pDataInfo->status = DATA_EXHAUSTED; + } else { + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, + pExchangeInfo->totalSize); + } + + if (pDataInfo->status != DATA_EXHAUSTED) { + pDataInfo->status = DATA_NOT_READY; + code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + return pExchangeInfo->pResult; + } + + if (completed == totalSources) { + return setAllSourcesCompleted(pOperator, startTs); + } + } + +_error: + pTaskInfo->code = code; + return NULL; +} + +static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { + SExchangeInfo *pExchangeInfo = pOperator->info; + SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + + if (pOperator->status == OP_RES_TO_RETURN) { + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + } + + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + int64_t startTs = taosGetTimestampUs(); + + // Asynchronously send all fetch requests to all sources. + for(int32_t i = 0; i < totalSources; ++i) { + int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + } + + int64_t endTs = taosGetTimestampUs(); + qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); + + tsem_wait(&pExchangeInfo->ready); + + pOperator->status = OP_RES_TO_RETURN; + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); +} + +static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { + SExchangeInfo *pExchangeInfo = pOperator->info; + SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - pColInfoData->pData = tmp; - pData += len; + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + int64_t startTs = taosGetTimestampUs(); + + while(1) { + if (pExchangeInfo->current >= totalSources) { + return setAllSourcesCompleted(pOperator, startTs); } - pRes->info.numOfCols = pOperator->numOfOutput; - pRes->info.rows = pRsp->numOfRows; + doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); + + tsem_wait(&pExchangeInfo->ready); - int64_t el = taosGetTimestampUs() - startTs; + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); + SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); + + SRetrieveTableRsp* pRsp = pDataInfo->pRsp; + if (pRsp->numOfRows == 0) { + qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, + pDataInfo->totalRows, pExchangeInfo->totalRows); + + pDataInfo->status = DATA_EXHAUSTED; + pExchangeInfo->current += 1; + continue; + } - pExchangeInfo->totalRows += pRsp->numOfRows; - pExchangeInfo->totalSize += pRsp->compLen; - pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows; - pExchangeInfo->totalElapsed += el; + SSDataBlock* pRes = pExchangeInfo->pResult; + setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->totalSize, - pExchangeInfo->current + 1, totalSources); + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, + pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, + totalSources); - pExchangeInfo->rowsOfCurrentSource = 0; + pDataInfo->status = DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, @@ -5072,13 +5193,38 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { return pExchangeInfo->pResult; } +} + +static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { + SOperatorInfo *pOperator = (SOperatorInfo*) param; + + SExchangeInfo *pExchangeInfo = pOperator->info; + SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + + if (pOperator->status == OP_EXEC_DONE) { + qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, + pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); + return NULL; + } + + *newgroup = false; + + if (pExchangeInfo->seqLoadData) { + return seqLoadRemoteData(pOperator); + } else { + return concurrentlyLoadRemoteData(pOperator); + } +#if 0 _error: tfree(pMsg); tfree(pMsgSendInfo); terrno = pTaskInfo->code; return NULL; +#endif } static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); @@ -5094,11 +5240,33 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* return NULL; } + size_t numOfSources = taosArrayGetSize(pSources); + pInfo->pSources = taosArrayDup(pSources); - assert(taosArrayGetSize(pInfo->pSources) > 0); + pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); + if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { + tfree(pInfo); + tfree(pOperator); + taosArrayDestroy(pInfo->pSources); + taosArrayDestroy(pInfo->pSourceDataInfo); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < numOfSources; ++i) { + SSourceDataInfo dataInfo = {0}; + dataInfo.status = DATA_NOT_READY; + dataInfo.pEx = pInfo; + dataInfo.index = i; + + taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + } size_t size = taosArrayGetSize(pExprInfo); pInfo->pResult = createResultDataBlock(pExprInfo); + pInfo->seqLoadData = true; + + tsem_init(&pInfo->ready, 0, 0); pOperator->name = "ExchangeOperator"; pOperator->operatorType = OP_Exchange; @@ -5132,6 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* } } #endif + return pOperator; }