提交 d87bb5f3 编写于 作者: H Haojun Liao

refactor(query): set the block groupId for stream block.

上级 ab104d86
...@@ -61,7 +61,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); ...@@ -61,7 +61,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
* @param type * @param type
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
...@@ -71,7 +71,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); ...@@ -71,7 +71,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
* @param type * @param type
* @return * @return
*/ */
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
......
...@@ -374,7 +374,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 ...@@ -374,7 +374,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
smaDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid); smaDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid);
qSetStreamInput(taskInfo, pMsg, inputType); qSetStreamInput(taskInfo, pMsg, inputType, true);
while (1) { while (1) {
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
uint64_t ts; uint64_t ts;
......
...@@ -264,7 +264,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -264,7 +264,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId]; qTaskInfo_t task = pExec->task[workerId];
ASSERT(task); ASSERT(task);
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
...@@ -510,7 +510,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -510,7 +510,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId]; qTaskInfo_t task = pExec->task[workerId];
ASSERT(task); ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
......
...@@ -2015,7 +2015,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, ...@@ -2015,7 +2015,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
suid); suid);
qSetStreamInput(taskInfo, pMsg, inputType); qSetStreamInput(taskInfo, pMsg, inputType, true);
while (1) { while (1) {
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
uint64_t ts; uint64_t ts;
......
...@@ -371,6 +371,7 @@ typedef struct SessionWindowSupporter { ...@@ -371,6 +371,7 @@ typedef struct SessionWindowSupporter {
SStreamAggSupporter* pStreamAggSup; SStreamAggSupporter* pStreamAggSup;
int64_t gap; int64_t gap;
} SessionWindowSupporter; } SessionWindowSupporter;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
...@@ -379,7 +380,6 @@ typedef struct SStreamBlockScanInfo { ...@@ -379,7 +380,6 @@ typedef struct SStreamBlockScanInfo {
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle void* streamBlockReader;// stream block reader handle
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
...@@ -394,8 +394,9 @@ typedef struct SStreamBlockScanInfo { ...@@ -394,8 +394,9 @@ typedef struct SStreamBlockScanInfo {
SOperatorInfo* pOperatorDumy; SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SCatchSupporter childAggSup; SCatchSupporter childAggSup;
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -32,11 +32,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -32,11 +32,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id);
} else { } else {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid;
// the block type can not be changed in the streamscan operators // the block type can not be changed in the streamscan operators
if (pInfo->blockType == 0) { if (pInfo->blockType == 0) {
...@@ -67,11 +68,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -67,11 +68,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
return qSetMultiStreamInput(tinfo, input, 1, type); return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
} }
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -82,7 +83,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -82,7 +83,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -878,6 +878,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -878,6 +878,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.uid = uid; pInfo->pRes->info.uid = uid;
pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.type = STREAM_NORMAL;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = uid;
}
int32_t numOfCols = pInfo->pRes->info.numOfCols; int32_t numOfCols = pInfo->pRes->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
...@@ -918,7 +924,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -918,7 +924,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// record the scan action. // record the scan action.
pInfo->numOfExec++; pInfo->numOfExec++;
pInfo->numOfRows += pBlockInfo->rows; pOperator->resultInfo.totalRows += pBlockInfo->rows;
if (rows == 0) { if (rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
......
...@@ -141,13 +141,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -141,13 +141,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT);
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK); qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
} else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} }
// exec // exec
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册