提交 da9f33a4 编写于 作者: H Haojun Liao

fix: fix error in set function ptr.

上级 4a42f6b9
......@@ -87,9 +87,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
}
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask);
......
......@@ -804,9 +804,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
SReadHandle mgHandle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
......
......@@ -443,7 +443,6 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
tSimpleHashCleanup(pFillSup->pResMap);
pFillSup->pResMap = NULL;
ASSERT(0);
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal, &pFillSup->pAPI->stateStore); //?????
pFillSup->cur.pRowVal = NULL;
cleanupExprSupp(&pFillSup->notFillExprSup);
......@@ -493,7 +492,6 @@ static void resetFillWindow(SResultRowData* pRowData) {
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStorageAPI* pAPI) {
resetFillWindow(&pFillSup->prev);
ASSERT(0);
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal, &pAPI->stateStore); //???
resetFillWindow(&pFillSup->cur);
resetFillWindow(&pFillSup->next);
......@@ -1335,7 +1333,7 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
}
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
SExprInfo* pFillExprInfo, int32_t numOfFillCols) {
SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI) {
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
if (!pFillSup) {
return NULL;
......@@ -1348,6 +1346,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
pFillSup->type = convertFillType(pPhyFillNode->mode);
pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
pFillSup->interval = *pInterval;
pFillSup->pAPI = pAPI;
int32_t code = initResultBuf(pFillSup);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1427,7 +1426,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
int32_t numOfFillCols = 0;
SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
if (!pInfo->pFillSup) {
goto _error;
}
......
......@@ -1070,9 +1070,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
pInfo->groupId = groupCol[rowIndex];
}
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) {
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) {
pTableScanInfo->base.cond.twindows = *pWin;
pTableScanInfo->base.cond.endVersion = version;
pTableScanInfo->base.cond.endVersion = ver;
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
......@@ -1182,17 +1182,19 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
continue;
}
if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue;
}
ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
!(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
break;
}
ASSERT(0);
// resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
pInfo->pTableScanOp->status = OP_OPENED;
return true;
}
......@@ -1650,13 +1652,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
ASSERT(0);
// qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
// pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
// pInfo->tqReader->pWalReader->curVersion);
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
return pResult;
}
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
......@@ -1673,7 +1675,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
while (1) {
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
ASSERT(0);
SSDataBlock* pRes = NULL;
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
......@@ -1757,8 +1758,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
ASSERT(0);
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
checkUpdateData(pInfo, true, pBlock, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
if (pInfo->pUpdateDataRes->info.rows > 0) {
......@@ -1934,8 +1934,7 @@ FETCH_NEXT_BLOCK:
pBlock->info.calWin.ekey = INT64_MAX;
pBlock->info.dataLoad = 1;
if (pInfo->pUpdateInfo) {
ASSERT(0);
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
......@@ -3097,9 +3096,9 @@ _error:
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
static void destoryTableCountScanOperator(void* param);
static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName);
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
......@@ -3385,11 +3384,11 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
}
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);
buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
pInfo->currGrpIdx++;
} else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);
buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName, pAPI);
pInfo->currGrpIdx++;
} else {
......@@ -3434,7 +3433,7 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
}
static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName) {
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI) {
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pSupp->groupByDbName) {
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
......@@ -3442,18 +3441,18 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
int64_t ntbNum = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
ASSERT(0);
if (ntbNum != 0) {
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
if (numOfTables != 0) {
fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes);
}
}
static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI) {
char stbName[TSDB_TABLE_NAME_LEN] = {0};
ASSERT(0);
// metaGetTableSzNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
pAPI->metaFn.getTableNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pSupp->groupByDbName) {
......
......@@ -2755,7 +2755,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->pState = taosMemoryCalloc(1, sizeof(void));
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
......@@ -2917,8 +2917,10 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
return TSDB_CODE_OUT_OF_MEMORY;
}
pSup->stateStore = *pStore;
initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput);
pSup->pState = taosMemoryCalloc(1, sizeof(void));
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pSup->pState) = *pState;
pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
......@@ -2940,7 +2942,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
return terrno;
}
pSup->stateStore = *pStore;
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册