From 863d30e5a9a70d60de4bbda3056e533be6354cd6 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 26 Apr 2023 18:58:19 +0800 Subject: [PATCH] opt tq read --- include/libs/stream/tstreamFileState.h | 1 + source/dnode/vnode/src/tq/tqRead.c | 39 ++++++++++++----------- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamState.c | 5 ++- source/libs/stream/src/tstreamFileState.c | 4 +++ 5 files changed, 31 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 25267be354..d50f0e0a31 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -42,6 +42,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); +bool needClearDiskBuff(SStreamFileState* pFileState); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2cda12c0e1..baa1a0e4a5 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -469,25 +469,28 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD pBlock->info.version = pReader->msg2.ver; if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) { - taosMemoryFree(pReader->pSchema); - pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); - if (pReader->pSchema == NULL) { - tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 - "), version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; - } + if(pReader->cachedSchemaVer != sversion) { + taosMemoryFree(pReader->pSchema); + pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchema == NULL) { + tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 + "), version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } - tDeleteSSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); - if (pReader->pSchemaWrapper == NULL) { - tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); - pReader->cachedSchemaSuid = 0; - terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; - return -1; + tDeleteSSchemaWrapper(pReader->pSchemaWrapper); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchemaWrapper == NULL) { + tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + pReader->cachedSchemaVer = sversion; } STSchema* pTschema = pReader->pSchema; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e738b19d05..e1488a8a6f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,7 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 1024 +#define STREAM_EXEC_MAX_BATCH_NUM 4096 bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 791d3013bf..c7d85ac885 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -363,7 +363,10 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear(SStreamState* pState) { #ifdef USE_ROCKSDB streamFileStateClear(pState->pFileState); - return streamStateClear_rocksdb(pState); + if (needClearDiskBuff(pState->pFileState)) { + streamStateClear_rocksdb(pState); + } + return 0; #else SWinKey key = {.ts = 0, .groupId = 0}; streamStatePut(pState, &key, NULL, 0); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 5c059765f1..b7401ec5d9 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -154,6 +154,10 @@ void streamFileStateClear(SStreamFileState* pFileState) { clearExpiredRowBuff(pFileState, 0, true); } +bool needClearDiskBuff(SStreamFileState* pFileState) { + return pFileState->flushMark > 0; +} + void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; SListIter iter = {0}; -- GitLab