提交 91ae07b6 编写于 作者: H Haojun Liao

fix(tmq): set the table list correctly.

上级 09df16f6
......@@ -175,7 +175,6 @@ struct SExecTaskInfo {
int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
// STableListInfo* pTableInfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -324,7 +323,7 @@ typedef struct STableScanBase {
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableInfoList;
STableListInfo* pTableListInfo;
} STableScanBase;
typedef struct STableScanInfo {
......@@ -370,7 +369,7 @@ typedef struct STagScanInfo {
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableInfoList;
STableListInfo* pTableListInfo;
} STagScanInfo;
typedef enum EStreamScanMode {
......@@ -518,6 +517,16 @@ typedef struct SOptrBasicInfo {
bool mergeResultBlock;
} SOptrBasicInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
bool groupKeyOptimized;
} SAggOperatorInfo;
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
......@@ -734,6 +743,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
......@@ -837,9 +847,11 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
char* buildTaskId(uint64_t taskId, uint64_t queryId);
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
......@@ -853,7 +865,6 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
......
......@@ -404,7 +404,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableInfoList;
STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
......@@ -485,9 +485,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
assert(pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
......@@ -507,7 +505,12 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
if (handle) {
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
SArray* pInfoList = getTableListInfo(*pTask);
STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
taosArrayDestroy(pInfoList);
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTableListInfo, readHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
goto _error;
......@@ -1083,7 +1086,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
STableListInfo* pTableListInfo = NULL;//pTaskInfo->pTableInfoList;
const char* id = GET_TASKID(pTaskInfo);
pTaskInfo->streamInfo.prepareStatus = *pOffset;
......@@ -1095,19 +1097,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (subType == TOPIC_SUB_TYPE__COLUMN) {
pOperator->status = OP_OPENED;
// TODO add more check
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream != 1) {
qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
return -1;
}
pOperator = pOperator->pDownstream[0];
}
pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
if (pOffset->type == TMQ_OFFSET__LOG) {
tsdbReaderClose(pScanBaseInfo->dataReader);
......@@ -1191,9 +1186,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
STableListInfo* pTableListInfo = ((STableScanInfo*)(p->info))->base.pTableListInfo;
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
return -1;
......
......@@ -676,7 +676,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
uint32_t status = 0;
......@@ -771,7 +771,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList);
int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableListInfo);
STableKeyInfo tInfo = {0};
while (1) {
......@@ -784,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->currentTable++;
taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pInfo->base.pTableInfoList);
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
......@@ -792,7 +792,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableInfoList, pInfo->currentTable);
tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
......@@ -804,14 +804,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
} else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
......@@ -830,7 +830,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result;
}
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
......@@ -841,7 +841,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
tsdbSetTableList(pInfo->base.dataReader, pList, num);
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
......@@ -876,7 +876,7 @@ static void destroyTableScanBase(STableScanBase* pBase) {
taosArrayDestroy(pBase->matchInfo.pList);
}
tableListDestroy(pBase->pTableInfoList);
tableListDestroy(pBase->pTableListInfo);
taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
cleanupExprSupp(&pBase->pseudoSup);
}
......@@ -945,7 +945,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->base.pTableInfoList = pTableListInfo;
pInfo->base.pTableListInfo = pTableListInfo;
pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
code = terrno;
......@@ -1065,7 +1065,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
if (hasNext) {
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
tsdbReaderClose(pReader);
......@@ -1087,7 +1087,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
return getTableGroupId(pTableScanInfo->base.pTableInfoList, uid);
return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
}
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
......@@ -1562,7 +1562,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.version = pBlock->info.version;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
......@@ -2326,6 +2326,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
destroyOperatorInfo(pStreamScan->pTableScanOp);
}
if (pStreamScan->tqReader) {
tqCloseReader(pStreamScan->tqReader);
}
......@@ -2359,6 +2360,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
......@@ -2372,6 +2374,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
int32_t code =
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
tableListDestroy(pTableListInfo);
goto _error;
}
......@@ -2391,11 +2394,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
if (pSubTableExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
tableListDestroy(pTableListInfo);
goto _error;
}
}
......@@ -2405,10 +2411,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
}
......@@ -2416,6 +2424,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
......@@ -2459,16 +2468,18 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableInfoList);
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
}
taosArrayDestroy(tableIdList);
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
} else {
taosArrayDestroy(pColIds);
tableListDestroy(pTableListInfo);
pColIds = NULL;
}
......@@ -2503,7 +2514,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
......@@ -2534,7 +2545,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pInfo->pTableInfoList);
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
......@@ -2546,7 +2557,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = tableListGetInfo(pInfo->pTableInfoList, pInfo->curPos);
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2607,7 +2618,7 @@ static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->pTableInfoList = tableListDestroy(pInfo->pTableInfoList);
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
......@@ -2634,7 +2645,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error;
}
pInfo->pTableInfoList = pTableListInfo;
pInfo->pTableListInfo = pTableListInfo;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
......@@ -2668,7 +2679,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pInfo->base.pTableInfoList, readIdx + pInfo->tableStartIndex);
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
......@@ -2725,7 +2736,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
continue;
}
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -2781,10 +2792,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{
size_t numOfTables = tableListGetSize(pInfo->base.pTableInfoList);
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableInfoList, i);
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
......@@ -2918,7 +2929,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = tableListGetSize(pInfo->base.pTableInfoList);
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
......@@ -2927,7 +2938,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex))->groupId;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
......@@ -2952,7 +2963,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex)->groupId;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
......@@ -3050,7 +3061,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->base.limitInfo.limit.limit = -1;
pInfo->base.limitInfo.slimit.limit = -1;
pInfo->base.pTableInfoList = pTableListInfo;
pInfo->base.pTableListInfo = pTableListInfo;
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
......
......@@ -2175,13 +2175,6 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
}
}
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL;
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册