diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 54e4b186202bab26295605b3a5f81d9e1e949417..3418a62f0fce5bddf6b90640141aaff9d4babda4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -123,6 +123,7 @@ typedef struct STsdbReader STsdbReader; #define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 +int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, const char *idstr); void tsdbReaderClose(STsdbReader *pReader); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8f888fc624ce13a4ca87ccbedd0918c42051b49f..8d77a41c8a00e0cf3ac0e00b4afeb2b547e94365 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -27,7 +27,6 @@ #include "tdatablock.h" #include "tdb.h" #include "tencode.h" -#include "tref.h" #include "tfs.h" #include "tglobal.h" #include "tjson.h" @@ -37,6 +36,7 @@ #include "tlrucache.h" #include "tmallocator.h" #include "tmsgcb.h" +#include "tref.h" #include "tskiplist.h" #include "tstream.h" #include "ttime.h" @@ -156,6 +156,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, const char* stbFullName, int32_t vgId); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4e01449c5532df74cfcaf43b37a3732bd6617fc2..77366481d10c52972aa71af9255cc32f057f2347 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3053,6 +3053,15 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return TSDB_CODE_SUCCESS; } +int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { + // if (pReader->pTableCheckInfo) taosArrayDestroy(pReader->pTableCheckInfo); + // pReader->pTableCheckInfo = createCheckInfoFromUid(pReader, uid); + // if (pReader->pTableCheckInfo == NULL) { + // return TSDB_CODE_TDB_OUT_OF_MEMORY; + // } + return TDB_CODE_SUCCESS; +} + /** * @brief Get all suids since suid * diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 58205168fdbfd43b62c591f888ae8d39269f15ad..8f61a676583277dc6adfada92e7610e6bd286caf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -258,7 +258,7 @@ enum { }; typedef struct STableScanInfo { - void* dataReader; + STsdbReader* dataReader; SReadHandle readHandle; SFileBlockLoadRecorder readRecorder; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c27aa3cfecba01d998776d3389f3260c75175bd2..a1ea712bbf32fe974677abb6e8973847e9ab8434 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1245,7 +1245,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); if (pHandle->tqReader) { pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; - pSTInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0); + tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, 0); } if (pSTInfo->interval.interval > 0) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1e5f33508e941210f3f0d9fe103b313056a96fb6..dcada39f09f166b0e3dbfe12b2808002d86c1fe6 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -451,6 +451,10 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } +int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) { + return schHandleCallback(param, pMsg, code); +} + int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) { if (!isHb) {