diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index eba70b8098ecf2ab34c3fdc713032b608d985490..851d4d63c56728853c75b8c8425f017e98af862a 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -30,6 +30,7 @@ typedef struct SPlanContext { SNode* pAstRoot; bool topicQuery; bool streamQuery; + bool rSmaQuery; bool showRewrite; int8_t triggerType; int64_t watermark; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 42951beca2e414611543d92e08b86d19f6247636..33af040915688fd83c4a82af3c89047be5d20dae 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b3583af1dccc3b91d38a896e513fcd9dbdcbd0aa..73583058f1fd599dba0608a1eff99d6702cb99cc 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -34,6 +34,54 @@ extern bool tsStreamSchedV; +int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) { + SNode* pAst = NULL; + SQueryPlan* pPlan = NULL; + terrno = TSDB_CODE_SUCCESS; + + if (nodesStringToNode(ast, &pAst) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + goto END; + } + + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + .rSmaQuery = true, + .triggerType = triggerType, + .watermark = watermark, + }; + if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + goto END; + } + + int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); + if (levelNum != 1) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + goto END; + } + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); + + int32_t opNum = LIST_LENGTH(inner->pNodeList); + if (opNum != 1) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + goto END; + } + + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + if (qSubPlanToString(plan, pStr, pLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + goto END; + } + +END: + if (pAst) nodesDestroyNode(pAst); + if (pPlan) nodesDestroyNode(pPlan); + return terrno; +} + int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 834d11fc202f75e515962d012fc25ea0e4378ff5..2b52d333dad3193841ee107c36ccf2e5a68438f3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); -int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); -SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); +int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows); // need to reposition diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cf62ea87145e58b9d99de2c73574c07503e73dfe..02ce6c4aad299c1c569b76a581691bbb91bd1a49 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { return false; } -int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - // currently only rows are used - - pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); - pBlockInfo->rows = pHandle->pBlock->numOfRows; - // pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData. - return 0; -} - -SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { +int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) { /*int32_t sversion = pHandle->pBlock->sversion;*/ // TODO set to real sversion int32_t sversion = 0; @@ -112,7 +103,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { STSchema* pTschema = pHandle->pSchema; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; - int32_t numOfRows = pHandle->pBlock->numOfRows; + *pNumOfRows = pHandle->pBlock->numOfRows; /*int32_t numOfCols = pHandle->pSchema->numOfCols;*/ int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); @@ -120,10 +111,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { colNumNeed = pSchemaWrapper->nCols; } - SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); - if (pArray == NULL) { - return NULL; + *ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); + if (*ppCols == NULL) { + return -1; } + int32_t colMeta = 0; int32_t colNeed = 0; while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) { @@ -136,21 +128,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { colNeed++; } else { SColumnInfoData colInfo = {0}; - /*int sz = numOfRows * pColSchema->bytes;*/ colInfo.info.bytes = pColSchema->bytes; colInfo.info.colId = pColSchema->colId; colInfo.info.type = pColSchema->type; - if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) { - taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); - return NULL; + if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) { + goto FAIL; } - taosArrayPush(pArray, &colInfo); + taosArrayPush(*ppCols, &colInfo); colMeta++; colNeed++; } } + int32_t colActual = taosArrayGetSize(*ppCols); + + // TODO in stream shuffle case, fetch groupId + *pGroupId = 0; + STSRowIter iter = {0}; tdSTSRowIterInit(&iter, pTschema); STSRow* row; @@ -159,22 +154,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); // get all wanted col of that block - int32_t colTot = taosArrayGetSize(pArray); - for (int32_t i = 0; i < colTot; i++) { - SColumnInfoData* pColData = taosArrayGet(pArray, i); + for (int32_t i = 0; i < colActual; i++) { + SColumnInfoData* pColData = taosArrayGet(*ppCols, i); SCellVal sVal = {0}; if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - /*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/ if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { - taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); - return NULL; + goto FAIL; } } curRow++; } - return pArray; + return 0; +FAIL: + taosArrayDestroy(*ppCols); + return -1; } void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f0d3d95b6adac69e5a288915101c528da3e2d8f2..b054bbfcb6eb77dd250510b82843e998c8bdb765 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -30,12 +30,11 @@ #include "query.h" #include "tcompare.h" #include "thash.h" -#include "vnode.h" #include "ttypes.h" +#include "vnode.h" #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) -#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) - +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { @@ -90,7 +89,7 @@ static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord } int64_t key = tw->skey, interval = pInterval->interval; - //convert key to second + // convert key to second key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000; if (pInterval->intervalUnit == 'y') { @@ -98,7 +97,7 @@ static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord } struct tm tm; - time_t t = (time_t)key; + time_t t = (time_t)key; taosLocalTime(&t, &tm); int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor); @@ -125,8 +124,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn // todo handle the time range case TSKEY sk = INT64_MIN; TSKEY ek = INT64_MAX; -// TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); + // TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); + // TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); if (true) { getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w); @@ -136,7 +135,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return true; } - while(1) { // todo handle the desc order scan case + while (1) { // todo handle the desc order scan case getNextTimeWindow(pInterval, &w, TSDB_ORDER_ASC); if (w.skey > pBlockInfo->window.ekey) { break; @@ -148,31 +147,31 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } } } else { -// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w); -// assert(w.skey <= pBlockInfo->window.ekey); -// -// if (w.skey > pBlockInfo->window.skey) { -// return true; -// } -// -// while(1) { -// getNextTimeWindow(pQueryAttr, &w); -// if (w.ekey < pBlockInfo->window.skey) { -// break; -// } -// -// assert(w.skey < pBlockInfo->window.skey); -// if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { -// return true; -// } -// } + // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w); + // assert(w.skey <= pBlockInfo->window.ekey); + // + // if (w.skey > pBlockInfo->window.skey) { + // return true; + // } + // + // while(1) { + // getNextTimeWindow(pQueryAttr, &w); + // if (w.ekey < pBlockInfo->window.skey) { + // break; + // } + // + // assert(w.skey < pBlockInfo->window.skey); + // if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { + // return true; + // } + // } } return false; } int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; STaskCostInfo* pCost = &pTaskInfo->cost; @@ -189,13 +188,13 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, taosMemoryFreeClear(pBlock->pBlockAgg); if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { - qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { @@ -218,12 +217,12 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, } return TSDB_CODE_SUCCESS; - } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead *status = FUNC_DATA_REQUIRED_DATA_LOAD; } } - ASSERT (*status == FUNC_DATA_REQUIRED_DATA_LOAD); + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); // todo filter data block according to the block sma data firstly #if 0 @@ -249,8 +248,8 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, doFilter(pTableScanInfo->pFilterNode, pBlock); if (pBlock->info.rows == 0) { pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); } return TSDB_CODE_SUCCESS; @@ -348,9 +347,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; -// if (pResultRowInfo->size > 0) { -// pResultRowInfo->curPos = 0; -// } + // if (pResultRowInfo->size > 0) { + // pResultRowInfo->curPos = 0; + // } qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); @@ -367,7 +366,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); if (pResultRowInfo->size > 0) { -// pResultRowInfo->curPos = pResultRowInfo->size - 1; + // pResultRowInfo->curPos = pResultRowInfo->size - 1; } p = doTableScanImpl(pOperator, newgroup); @@ -376,9 +375,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { return p; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, - int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, - SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, + int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, + SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, + SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); @@ -391,26 +391,26 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; - pInfo->dataBlockLoadFlag= dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pTsdbReadHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; + pInfo->dataBlockLoadFlag = dataLoadFlag; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pTsdbReadHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->getNextFn = doTableScan; + pOperator->pTaskInfo = pTaskInfo; static int32_t cost = 0; pOperator->cost.openCost = ++cost; @@ -456,67 +456,67 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) { ++numRowSteps; } - tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); + tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps); tableBlockDist.maxRows = INT_MIN; tableBlockDist.minRows = INT_MAX; tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist); - tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); + tableBlockDist.numOfRowsInMemTable = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); SSDataBlock* pBlock = pTableScanInfo->pResBlock; - pBlock->info.rows = 1; + pBlock->info.rows = 1; pBlock->info.numOfCols = 1; -// SBufferWriter bw = tbufInitWriter(NULL, false); -// blockDistInfoToBinary(&tableBlockDist, &bw); + // SBufferWriter bw = tbufInitWriter(NULL, false); + // blockDistInfoToBinary(&tableBlockDist, &bw); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); -// int32_t len = (int32_t) tbufTell(&bw); -// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); -// *(int32_t*) pColInfo->pData = len; -// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); -// -// tbufCloseWriter(&bw); + // int32_t len = (int32_t) tbufTell(&bw); + // pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); + // *(int32_t*) pColInfo->pData = len; + // memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); + // + // tbufCloseWriter(&bw); -// SArray* g = GET_TABLEGROUP(pOperator->, 0); -// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); + // SArray* g = GET_TABLEGROUP(pOperator->, 0); + // pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); pOperator->status = OP_EXEC_DONE; return pBlock; } SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } - pInfo->dataReader = dataReader; -// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + pInfo->dataReader = dataReader; + // pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_BINARY; + infoData.info.type = TSDB_DATA_TYPE_BINARY; infoData.info.bytes = 1024; infoData.info.colId = 0; -// taosArrayPush(pInfo->block.pDataBlock, &infoData); + // taosArrayPush(pInfo->block.pDataBlock, &infoData); - pOperator->name = "DataBlockInfoScanOperator"; + pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doBlockInfoScan; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doBlockInfoScan; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; return pOperator; - _error: +_error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; @@ -558,29 +558,42 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) blockDataCleanup(pInfo->pRes); while (tqNextDataBlock(pInfo->readerHandle)) { - pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - terrno = pTaskInfo->code; - pOperator->status = OP_EXEC_DONE; - return NULL; - } + SArray* pCols = NULL; + uint64_t groupId; + int32_t numOfRows; + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows); - if (pBlockInfo->rows == 0) { - break; + if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { + pTaskInfo->code = code; + return NULL; } - SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); + pInfo->pRes->info.groupId = groupId; + pInfo->pRes->info.rows = numOfRows; int32_t numOfCols = pInfo->pRes->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); if (!pColMatchInfo->output) { continue; } - ASSERT(pColMatchInfo->colId == p->info.colId); - taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p); + bool colExists = false; + for (int32_t j = 0; j < taosArrayGetSize(pCols); ++j) { + SColumnInfoData* pResCol = taosArrayGet(pCols, j); + if (pResCol->info.colId == pColMatchInfo->colId) { + taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol); + colExists = true; + break; + } + } + + // the required column does not exists in submit block, let's set it to be all null value + if (!colExists) { + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); + colInfoDataEnsureCapacity(pDst, 0, pBlockInfo->rows); + colDataAppendNNULL(pDst, 0, pBlockInfo->rows); + } } if (pInfo->pRes->pDataBlock == NULL) { @@ -605,7 +618,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) } } -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, + SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -618,7 +632,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* int32_t numOfOutput = taosArrayGetSize(pColList); SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); - for(int32_t i = 0; i < numOfOutput; ++i) { + for (int32_t i = 0; i < numOfOutput; ++i) { int16_t* id = taosArrayGet(pColList, i); taosArrayPush(pColIds, id); } @@ -644,16 +658,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pInfo->readerHandle = streamReadHandle; pInfo->pRes = pResBlock; - pOperator->name = "StreamBlockScanOperator"; + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doStreamBlockScan; - pOperator->closeFn = operatorDummyCloseFn; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doStreamBlockScan; + pOperator->closeFn = operatorDummyCloseFn; + pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -733,9 +747,9 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->useconds = htobe64(pRsp->useconds); - pRsp->handle = htobe64(pRsp->handle); - pRsp->compLen = htonl(pRsp->compLen); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->handle = htobe64(pRsp->handle); + pRsp->compLen = htonl(pRsp->compLen); } else { operator->pTaskInfo->code = code; } @@ -777,7 +791,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { if (rowRes[j] == 0) { continue; } - + colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); numOfRow += 1; } @@ -828,7 +842,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); int64_t tmp = 0; char t[10] = {0}; - STR_TO_VARSTR(t, "_"); //TODO + STR_TO_VARSTR(t, "_"); // TODO if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { colDataAppend(pColInfoData, numOfRows, t, false); } else { @@ -926,12 +940,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB return NULL; } - pInfo->accountId = accountId; + pInfo->accountId = accountId; pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->capacity = 4096; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + pInfo->pRes = pResBlock; + pInfo->capacity = 4096; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; // TODO remove it int32_t tableType = 0; @@ -986,9 +1000,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB tableType = TSDB_MGMT_TABLE_CONNS; } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, tListLen(pName->tname)) == 0) { tableType = TSDB_MGMT_TABLE_QUERIES; - } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, tListLen(pName->tname)) == 0) { + } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, tListLen(pName->tname)) == 0) { tableType = TSDB_MGMT_TABLE_VNODES; - }else { + } else { ASSERT(0); } @@ -1025,15 +1039,15 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB #endif } - pOperator->name = "SysTableScanOperator"; + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->getNextFn = doSysTableScan; - pOperator->closeFn = destroySysScanOperator; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->getNextFn = doSysTableScan; + pOperator->closeFn = destroySysScanOperator; + pOperator->pTaskInfo = pTaskInfo; return pOperator; }