提交 4a42f6b9 编写于 作者: H Haojun Liao

fix: fix error in api call.

上级 a9b18d20
...@@ -396,7 +396,7 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in ...@@ -396,7 +396,7 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(); int64_t (*getNumOfRowsInMem)(void* pVnode);
/** /**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
......
...@@ -33,6 +33,7 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) { ...@@ -33,6 +33,7 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb};
mTrace("msg:%p, in query queue is processing", pMsg); mTrace("msg:%p, in query queue is processing", pMsg);
......
...@@ -277,6 +277,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -277,6 +277,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
SReadHandle handle = { .vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState }; SReadHandle handle = { .vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState };
initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
...@@ -849,6 +851,8 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t ...@@ -849,6 +851,8 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
SReadHandle handle = { .vnode = pVnode, .initTqReader = 1 }; SReadHandle handle = { .vnode = pVnode, .initTqReader = 1 };
initStorageAPI(&handle.api);
if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) { if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -671,6 +671,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -671,6 +671,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->pRef = pRef; pHandle->pRef = pRef;
SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver}; SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
initStorageAPI(&handle.api);
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......
...@@ -65,6 +65,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { ...@@ -65,6 +65,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->getBasicInfo = vnodeGetInfo; pMeta->getBasicInfo = vnodeGetInfo;
pMeta->getNumOfChildTables = metaGetStbStats; pMeta->getNumOfChildTables = metaGetStbStats;
// pMeta->getNumOfRowsInMem = tsdbGetNumOfRowsInMemTable;
pMeta->getChildTableList = vnodeGetCtbIdList; pMeta->getChildTableList = vnodeGetCtbIdList;
......
...@@ -250,7 +250,9 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -250,7 +250,9 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
uint8_t *pCont; uint8_t *pCont;
SEncoder *pCoder = &(SEncoder){0}; SEncoder *pCoder = &(SEncoder){0};
SDeleteRes res = {0}; SDeleteRes res = {0};
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
initStorageAPI(&handle.api);
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
if (code) goto _exit; if (code) goto _exit;
......
...@@ -1814,9 +1814,13 @@ void destroySysScanOperator(void* param) { ...@@ -1814,9 +1814,13 @@ void destroySysScanOperator(void* param) {
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
}
pInfo->pCur = NULL; pInfo->pCur = NULL;
} }
if (pInfo->pIdx) { if (pInfo->pIdx) {
taosArrayDestroy(pInfo->pIdx->uids); taosArrayDestroy(pInfo->pIdx->uids);
taosMemoryFree(pInfo->pIdx); taosMemoryFree(pInfo->pIdx);
...@@ -2200,7 +2204,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -2200,7 +2204,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
} }
pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
blockDistInfo.numOfInmemRows = (int32_t) pAPI->metaFn.getNumOfRowsInMem(pBlockScanInfo->pHandle); blockDistInfo.numOfInmemRows = (int32_t) pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle);
SSDataBlock* pBlock = pBlockScanInfo->pResBlock; SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
......
...@@ -2906,7 +2906,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin ...@@ -2906,7 +2906,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
} }
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType) { SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore) {
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap; pSup->gap = gap;
...@@ -2939,6 +2939,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, ...@@ -2939,6 +2939,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir);
return terrno; return terrno;
} }
pSup->stateStore = *pStore;
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].saveHandle.pBuf = pSup->pResultBuf; pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
...@@ -3600,7 +3602,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3600,7 +3602,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
} }
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0); pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4149,7 +4151,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4149,7 +4151,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
int16_t type = pColNode->node.resType.type; int16_t type = pColNode->node.resType.type;
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type); type, &pTaskInfo->storageAPI.stateStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册