提交 d47950f5 编写于 作者: H Haojun Liao

[td-11818]Multiway send and retrieve for exchange operator.

上级 699a94cc
...@@ -620,36 +620,38 @@ TEST(testCase, create_topic_Test) { ...@@ -620,36 +620,38 @@ TEST(testCase, create_topic_Test) {
////TEST(testCase, tmq_commit_TEST) { ////TEST(testCase, tmq_commit_TEST) {
////} ////}
// //
//TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); pRes = taos_query(pConn, "create table tu using st1 tags(1) tu1 using st1 tags(2)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// for(int32_t i = 0; i < 100000; ++i) { for(int32_t i = 0; i < 5000000; i += 4) {
// char sql[512] = {0}; char sql[512] = {0};
// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); sprintf(sql, "insert into tu values(now+%da, %d) (now+%da, %d) (now+%da, %d) (now+%da, %d)"
// TAOS_RES* p = taos_query(pConn, sql); " tu1 values(now+%da, %d) (now+%da, %d) (now+%da, %d) (now+%da, %d)", i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3,
// if (taos_errno(p) != 0) { i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3);
// printf("failed to insert data, reason:%s\n", taos_errstr(p)); TAOS_RES* p = taos_query(pConn, sql);
// } if (taos_errno(p) != 0) {
// printf("failed to insert data, reason:%s\n", taos_errstr(p));
// taos_free_result(p); }
// }
// taos_free_result(p);
}
// pRes = taos_query(pConn, "select * from tu"); // pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
...@@ -668,36 +670,36 @@ TEST(testCase, create_topic_Test) { ...@@ -668,36 +670,36 @@ TEST(testCase, create_topic_Test) {
// } // }
// //
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//
//TEST(testCase, projection_query_stables) { TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts from st1"); pRes = taos_query(pConn, "select ts from st1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); taos_free_result(pRes);
// ASSERT_TRUE(false); ASSERT_TRUE(false);
// } }
//
// TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0}; char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str); printf("%s\n", str);
// } }
//
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
// //
//TEST(testCase, agg_query_tables) { //TEST(testCase, agg_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......
...@@ -371,18 +371,31 @@ typedef struct STaskParam { ...@@ -371,18 +371,31 @@ typedef struct STaskParam {
struct SUdfInfo* pUdfInfo; struct SUdfInfo* pUdfInfo;
} STaskParam; } STaskParam;
enum {
DATA_NOT_READY = 0x1,
DATA_READY = 0x2,
DATA_EXHAUSTED = 0x3,
};
typedef struct SSourceDataInfo {
struct SExchangeInfo *pEx;
int32_t index;
SRetrieveTableRsp *pRsp;
uint64_t totalRows;
int32_t status;
} SSourceDataInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
SRetrieveTableRsp* pRsp;
SSDataBlock* pResult; SSDataBlock* pResult;
bool seqLoadData;
int32_t current; int32_t current;
uint64_t rowsOfCurrentSource; uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalSize; // total load bytes from remote uint64_t totalElapsed; // total elapsed time
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SExchangeInfo; } SExchangeInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
......
...@@ -4918,14 +4918,16 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { ...@@ -4918,14 +4918,16 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
} }
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SExchangeInfo* pEx = (SExchangeInfo*) param; SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param;
pEx->pRsp = pMsg->pData; pSourceDataInfo->pRsp = pMsg->pData;
pEx->pRsp->numOfRows = htonl(pEx->pRsp->numOfRows); SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pEx->pRsp->useconds = htobe64(pEx->pRsp->useconds); pRsp->numOfRows = htonl(pRsp->numOfRows);
pEx->pRsp->compLen = htonl(pEx->pRsp->compLen); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
tsem_post(&pEx->ready); pSourceDataInfo->status = DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
} }
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
...@@ -4955,115 +4957,234 @@ void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -4955,115 +4957,234 @@ void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);
} }
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInfo *pTaskInfo, int32_t sourceIndex) {
SOperatorInfo *pOperator = (SOperatorInfo*) param; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SExchangeInfo *pExchangeInfo = pOperator->info; SResFetchReq* pMsg = calloc(1, sizeof(SResFetchReq));
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; if (NULL == pMsg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
*newgroup = false; SDownstreamSource *pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
if (pExchangeInfo->current >= totalSources) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); pMsg->header.vgId = htonl(pSource->addr.nodeId);
return NULL; pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
tfree(pMsg);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
} }
SResFetchReq* pMsg = NULL; pMsgSendInfo->param = pDataInfo;
SMsgSendInfo* pMsgSendInfo = NULL; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
while(1) { int64_t transporterId = 0;
pMsg = calloc(1, sizeof(SResFetchReq)); int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo);
if (NULL == pMsg) { // todo handle malloc error return TSDB_CODE_SUCCESS;
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; }
goto _error;
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) {
char* pData = pDataInfo->pRsp->data;
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows);
if (tmp == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); size_t len = pRsp->numOfRows * pColInfoData->info.bytes;
memcpy(tmp, pData, len);
int64_t startTs = taosGetTimestampUs(); pColInfoData->pData = tmp;
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, pData += len;
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, pExchangeInfo->current, totalSources); }
pMsg->header.vgId = htonl(pSource->addr.nodeId); pRes->info.rows = pRsp->numOfRows;
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques int64_t el = taosGetTimestampUs() - startTs;
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _error;
}
pMsgSendInfo->param = pExchangeInfo; pExchangeInfo->totalRows += pRsp->numOfRows;
pMsgSendInfo->msgInfo.pData = pMsg; pExchangeInfo->totalSize += pRsp->compLen;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pDataInfo->totalRows += pRsp->numOfRows;
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0; pExchangeInfo->totalElapsed += el;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo);
tsem_wait(&pExchangeInfo->ready);
SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; return TSDB_CODE_SUCCESS;
if (pRsp->numOfRows == 0) { }
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows);
pExchangeInfo->rowsOfCurrentSource = 0; static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) {
pExchangeInfo->current += 1; SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pExchangeInfo->current >= totalSources) { int64_t el = taosGetTimestampUs() - startTs;
int64_t el = taosGetTimestampUs() - startTs; pExchangeInfo->totalElapsed += el;
pExchangeInfo->totalElapsed += el;
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
return NULL; pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
} else {
doSetOperatorCompleted(pOperator);
return NULL;
}
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SExchangeInfo *pExchangeInfo, SExecTaskInfo *pTaskInfo) {
int32_t code = 0;
int64_t startTs = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
while (1) {
int32_t completed = 0;
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == DATA_EXHAUSTED) {
completed += 1;
continue; continue;
} }
}
SSDataBlock* pRes = pExchangeInfo->pResult; if (pDataInfo->status != DATA_READY) {
char* pData = pRsp->data; continue;
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, i);
SSDataBlock* pRes = pExchangeInfo->pResult;
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { if (pRsp->numOfRows == 0) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
if (tmp == NULL) { pExchangeInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED;
completed += 1;
continue;
}
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
if (code != 0) {
goto _error; goto _error;
} }
size_t len = pRsp->numOfRows * pColInfoData->info.bytes; if (pRsp->completed == 1) {
memcpy(tmp, pData, len); qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1,
totalSources);
pDataInfo->status = DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows,
pExchangeInfo->totalSize);
}
if (pDataInfo->status != DATA_EXHAUSTED) {
pDataInfo->status = DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
return pExchangeInfo->pResult;
}
if (completed == totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
}
}
_error:
pTaskInfo->code = code;
return NULL;
}
static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs();
// Asynchronously send all fetch requests to all sources.
for(int32_t i = 0; i < totalSources; ++i) {
int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pColInfoData->pData = tmp; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
pData += len; int64_t startTs = taosGetTimestampUs();
while(1) {
if (pExchangeInfo->current >= totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
} }
pRes->info.numOfCols = pOperator->numOfOutput; doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
pRes->info.rows = pRsp->numOfRows;
tsem_wait(&pExchangeInfo->ready);
int64_t el = taosGetTimestampUs() - startTs; SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pExchangeInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED;
pExchangeInfo->current += 1;
continue;
}
pExchangeInfo->totalRows += pRsp->numOfRows; SSDataBlock* pRes = pExchangeInfo->pResult;
pExchangeInfo->totalSize += pRsp->compLen; setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows;
pExchangeInfo->totalElapsed += el;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->totalSize, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pExchangeInfo->current + 1, totalSources); pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
pExchangeInfo->rowsOfCurrentSource = 0; pDataInfo->status = DATA_EXHAUSTED;
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
...@@ -5072,13 +5193,38 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { ...@@ -5072,13 +5193,38 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
return pExchangeInfo->pResult; return pExchangeInfo->pResult;
} }
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
return NULL;
}
*newgroup = false;
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteData(pOperator);
}
#if 0
_error: _error:
tfree(pMsg); tfree(pMsg);
tfree(pMsgSendInfo); tfree(pMsgSendInfo);
terrno = pTaskInfo->code; terrno = pTaskInfo->code;
return NULL; return NULL;
#endif
} }
static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); static SSDataBlock* createResultDataBlock(const SArray* pExprInfo);
...@@ -5094,11 +5240,33 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5094,11 +5240,33 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
return NULL; return NULL;
} }
size_t numOfSources = taosArrayGetSize(pSources);
pInfo->pSources = taosArrayDup(pSources); pInfo->pSources = taosArrayDup(pSources);
assert(taosArrayGetSize(pInfo->pSources) > 0); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
tfree(pInfo);
tfree(pOperator);
taosArrayDestroy(pInfo->pSources);
taosArrayDestroy(pInfo->pSourceDataInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
}
size_t size = taosArrayGetSize(pExprInfo); size_t size = taosArrayGetSize(pExprInfo);
pInfo->pResult = createResultDataBlock(pExprInfo); pInfo->pResult = createResultDataBlock(pExprInfo);
pInfo->seqLoadData = true;
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
pOperator->operatorType = OP_Exchange; pOperator->operatorType = OP_Exchange;
...@@ -5132,6 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5132,6 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
} }
} }
#endif #endif
return pOperator; return pOperator;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册