提交 e7b37486 编写于 作者: dengyihao's avatar dengyihao

vnode snapshot read

上级 247d3ac7
...@@ -196,11 +196,6 @@ void streamBackendCleanup(void* arg) { ...@@ -196,11 +196,6 @@ void streamBackendCleanup(void* arg) {
return; return;
} }
/*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* checkpointInUse: |--cp2--|--cp4--|, checkpointDir in checkpointInUse do replicate trans, cannot del until
* replication is finished
*/
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock); taosWLockLatch(&pMeta->checkpointDirLock);
...@@ -218,12 +213,16 @@ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { ...@@ -218,12 +213,16 @@ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
taosWUnLockLatch(&pMeta->checkpointDirLock); taosWUnLockLatch(&pMeta->checkpointDirLock);
return 0; return 0;
} }
/*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* checkpointInUse: |--cp2--|--cp4--|
* checkpointInUse is doing translation, cannot del until
* replication is finished
*/
int32_t delObsoleteCheckpoint(void* arg, const char* path) { int32_t delObsoleteCheckpoint(void* arg, const char* path) {
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock); taosWLockLatch(&pMeta->checkpointDirLock);
int64_t checkpointId = pMeta->checkpointTs;
taosArrayPush(pMeta->checkpointSaved, &checkpointId);
SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t));
SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t));
...@@ -241,13 +240,16 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { ...@@ -241,13 +240,16 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
} }
} }
} else { } else {
for (int i = taosArrayGetSize(pMeta->checkpointSaved); i >= 0; i--) { int32_t sz = taosArrayGetSize(pMeta->checkpointSaved);
int32_t dsz = sz - pMeta->checkpointCap; // del size
for (int i = 0; i < dsz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
if (taosArrayGetSize(checkpointDup) < pMeta->checkpointCap) { taosArrayPush(checkpointDel, &id);
taosArrayPush(checkpointDup, &id); }
} else { for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
taosArrayPush(checkpointDel, &id); int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
} taosArrayPush(checkpointDup, &id);
} }
} }
taosArrayDestroy(pMeta->checkpointSaved); taosArrayDestroy(pMeta->checkpointSaved);
...@@ -301,6 +303,10 @@ int32_t streamBackendDoCheckpoint(void* arg, const char* path) { ...@@ -301,6 +303,10 @@ int32_t streamBackendDoCheckpoint(void* arg, const char* path) {
} }
rocksdb_checkpoint_object_destroy(cp); rocksdb_checkpoint_object_destroy(cp);
} }
taosWLockLatch(&pMeta->checkpointDirLock);
taosArrayPush(pMeta->checkpointSaved, &checkpointId);
taosWUnLockLatch(&pMeta->checkpointDirLock);
delObsoleteCheckpoint(arg, path); delObsoleteCheckpoint(arg, path);
_ERROR: _ERROR:
taosReleaseRef(streamBackendId, backendRid); taosReleaseRef(streamBackendId, backendRid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册