未验证 提交 8f0ecd46 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17961 from taosdata/feature/stream

enh(tmq): offset commit only when needed
...@@ -17,9 +17,9 @@ ...@@ -17,9 +17,9 @@
#include "tq.h" #include "tq.h"
struct STqOffsetStore { struct STqOffsetStore {
char* fname;
STQ* pTq; STQ* pTq;
SHashObj* pHash; // SHashObj<subscribeKey, offset> SHashObj* pHash; // SHashObj<subscribeKey, offset>
int8_t needCommit;
}; };
char* tqOffsetBuildFName(const char* path, int32_t fVer) { char* tqOffsetBuildFName(const char* path, int32_t fVer) {
...@@ -74,6 +74,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { ...@@ -74,6 +74,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
return NULL; return NULL;
} }
pStore->pTq = pTq; pStore->pTq = pTq;
pStore->needCommit = 0;
pTq->pOffsetStore = pStore; pTq->pOffsetStore = pStore;
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
...@@ -100,6 +101,7 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { ...@@ -100,6 +101,7 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
} }
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
pStore->needCommit = 1;
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
} }
...@@ -108,14 +110,23 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { ...@@ -108,14 +110,23 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
} }
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
if (!pStore->needCommit) return 0;
// TODO file name should be with a newer version // TODO file name should be with a newer version
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosMemoryFree(fname);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
sysErrStr);
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
taosMemoryFree(fname);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pStore->pHash, pIter); pIter = taosHashIterate(pStore->pHash, pIter);
...@@ -152,5 +163,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { ...@@ -152,5 +163,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
} }
// close and rename file // close and rename file
taosCloseFile(&pFile); taosCloseFile(&pFile);
pStore->needCommit = 0;
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册