diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 47bfc47b532b83d05e6be64b5e985c64dc28278f..404d81465b811dc15e45b75e11d56dc8ca3cd1fe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -202,11 +202,6 @@ typedef struct { int8_t reserved; } STaskSinkFetch; -enum { - TASK_SOURCE__SCAN = 1, - TASK_SOURCE__PIPE, -}; - enum { TASK_EXEC__NONE = 1, TASK_EXEC__PIPE, @@ -225,11 +220,6 @@ enum { TASK_SINK__FETCH, }; -enum { - TASK_INPUT_TYPE__SUMBIT_BLOCK = 1, - TASK_INPUT_TYPE__DATA_BLOCK, -}; - enum { TASK_TRIGGER_STATUS__IN_ACTIVE = 1, TASK_TRIGGER_STATUS__ACTIVE, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8dd8a29c865163e86e1004667c77a722b2201fd7..f6d8ea51c4695fd9953567c07548066c95a3d23c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeStreamThreads = tsNumOfCores; + tsNumOfVnodeStreamThreads = tsNumOfCores / 4; tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 3aebb1c6ba1174a26516abe7c52298bb61642639..e4b27292bb7b084a4b546111d026cfbf91cc823e 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -151,8 +151,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); // meta section end // seek section -int walChangeWrite(SWal* pWal, int64_t ver); -int walInitWriteFile(SWal* pWal); +int64_t walChangeWrite(SWal* pWal, int64_t ver); +int walInitWriteFile(SWal* pWal); // seek section end int64_t walGetSeq(); diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 87ab155065021cb902739ef18c8925afc04c2f07..1196914daea766159f20b507d2bfb908d6c309e0 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -74,7 +74,7 @@ int walInitWriteFile(SWal* pWal) { return 0; } -int walChangeWrite(SWal* pWal, int64_t ver) { +int64_t walChangeWrite(SWal* pWal, int64_t ver) { int code; TdFilePtr pIdxTFile, pLogTFile; char fnameStr[WAL_FILE_LEN]; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index eaf43ba7d77531cf4f0b7cc1a62e4497fd6de36e..ad571a9e822cb4363ad13db7c1b8d60b2cc3272d 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -208,6 +208,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { taosCloseFile(&pIdxFile); taosCloseFile(&pLogFile); + taosFsyncFile(pWal->pLogFile); + taosFsyncFile(pWal->pIdxFile); + walSaveMeta(pWal); // unlock diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 8275072748123a4f4fa28b0e8401b967b0d2b099..aee84a0d55336c63840d1a5df887da7752592841 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -832,8 +832,9 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { if (pNode) { SHashEntry *pe = pHashObj->hashList[slot]; - uint16_t prevRef = atomic_load_16(&pNode->refCount); + /*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/ uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1); +#if 0 ASSERT(prevRef < afterRef); // the reference count value is overflow, which will cause the delete node operation immediately. @@ -845,6 +846,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { } else { data = GET_HASH_NODE_DATA(pNode); } +#endif + data = GET_HASH_NODE_DATA(pNode); if (afterRef >= MAX_WARNING_REF_COUNT) { uWarn("hash entry ref count is abnormally high: %d", afterRef);