提交 bb72f07a 编写于 作者: Y yihaoDeng

rm expire checkpoint

上级 503d7540
...@@ -417,8 +417,8 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { ...@@ -417,8 +417,8 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints");
if (!taosDirExist(checkpointPath)) { if (!taosDirExist(checkpointPath)) {
return 0;
// no checkpoint, nothing to load // no checkpoint, nothing to load
return 0;
} }
TdDirPtr pDir = taosOpenDir(checkpointPath); TdDirPtr pDir = taosOpenDir(checkpointPath);
...@@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t ...@@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
inst->dbOpt = handle->dbOpt; inst->dbOpt = handle->dbOpt;
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); //rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else { } else {
inst = *pInst; inst = *pInst;
...@@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { ...@@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); //rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
......
...@@ -96,12 +96,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -96,12 +96,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->streamBackend = streamBackendInit(streamPath);
if (pMeta->streamBackend == NULL) {
goto _err;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->pTaskBackendUnique = pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
...@@ -109,6 +103,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -109,6 +103,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->checkpointCap = 4; pMeta->checkpointCap = 4;
taosInitRWLatch(&pMeta->checkpointDirLock); taosInitRWLatch(&pMeta->checkpointDirLock);
pMeta->streamBackend = streamBackendInit(streamPath);
if (pMeta->streamBackend == NULL) {
goto _err;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
code = streamBackendLoadCheckpointInfo(pMeta); code = streamBackendLoadCheckpointInfo(pMeta);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册