提交 863d30e5 编写于 作者: L liuyao

opt tq read

上级 2533b5c5
...@@ -42,6 +42,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ ...@@ -42,6 +42,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
TSKEY delMark); TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
......
...@@ -469,25 +469,28 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD ...@@ -469,25 +469,28 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
pBlock->info.version = pReader->msg2.ver; pBlock->info.version = pReader->msg2.ver;
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) { if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) {
taosMemoryFree(pReader->pSchema); if(pReader->cachedSchemaVer != sversion) {
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); taosMemoryFree(pReader->pSchema);
if (pReader->pSchema == NULL) { pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 if (pReader->pSchema == NULL) {
"), version %d, possibly dropped table", tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); "), version %d, possibly dropped table",
pReader->cachedSchemaSuid = 0; pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; pReader->cachedSchemaSuid = 0;
return -1; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
} return -1;
}
tDeleteSSchemaWrapper(pReader->pSchemaWrapper); tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
}
pReader->cachedSchemaVer = sversion;
} }
STSchema* pTschema = pReader->pSchema; STSchema* pTschema = pReader->pSchema;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "streamInc.h" #include "streamInc.h"
#define STREAM_EXEC_MAX_BATCH_NUM 1024 #define STREAM_EXEC_MAX_BATCH_NUM 4096
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
......
...@@ -363,7 +363,10 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { ...@@ -363,7 +363,10 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear(SStreamState* pState) { int32_t streamStateClear(SStreamState* pState) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
streamFileStateClear(pState->pFileState); streamFileStateClear(pState->pFileState);
return streamStateClear_rocksdb(pState); if (needClearDiskBuff(pState->pFileState)) {
streamStateClear_rocksdb(pState);
}
return 0;
#else #else
SWinKey key = {.ts = 0, .groupId = 0}; SWinKey key = {.ts = 0, .groupId = 0};
streamStatePut(pState, &key, NULL, 0); streamStatePut(pState, &key, NULL, 0);
......
...@@ -154,6 +154,10 @@ void streamFileStateClear(SStreamFileState* pFileState) { ...@@ -154,6 +154,10 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true); clearExpiredRowBuff(pFileState, 0, true);
} }
bool needClearDiskBuff(SStreamFileState* pFileState) {
return pFileState->flushMark > 0;
}
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0; uint64_t i = 0;
SListIter iter = {0}; SListIter iter = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册