diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 952f81e1f46fdd5d6d3e537921a8c2f3790a6925..a7f816bb1be2dfec3f848d01081ad90a91106e3d 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -17,9 +17,9 @@ #include "tq.h" struct STqOffsetStore { - char* fname; STQ* pTq; SHashObj* pHash; // SHashObj + int8_t needCommit; }; char* tqOffsetBuildFName(const char* path, int32_t fVer) { @@ -74,6 +74,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { return NULL; } pStore->pTq = pTq; + pStore->needCommit = 0; pTq->pOffsetStore = pStore; pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); @@ -100,6 +101,7 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { } int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { + pStore->needCommit = 1; return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); } @@ -108,14 +110,23 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { } int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { + if (!pStore->needCommit) return 0; // TODO file name should be with a newer version char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - taosMemoryFree(fname); if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t sysErr = errno; + const char* sysErrStr = strerror(errno); + tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname, + sysErrStr); ASSERT(0); return -1; } + taosMemoryFree(fname); void* pIter = NULL; while (1) { pIter = taosHashIterate(pStore->pHash, pIter); @@ -152,5 +163,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { } // close and rename file taosCloseFile(&pFile); + pStore->needCommit = 0; return 0; }