提交 ed4aaade 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 7db12bb3
......@@ -18,6 +18,7 @@
#include "os.h"
#include "tarray.h"
#include "tdef.h"
#include "tlist.h"
......@@ -27,29 +28,33 @@ extern "C" {
typedef struct SStreamFileState SStreamFileState;
typedef struct SRowBuffPos {
void* pRowBuff;
void* pKey;
bool beFlushed;
bool beUsed;
void* pRowBuff;
void* pKey;
bool beFlushed;
bool beUsed;
} SRowBuffPos;
typedef SList SStreamSnapshot;
typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
void releaseRowBuffPos(SRowBuffPos* pBuff);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
void releaseRowBuffPos(SRowBuffPos* pBuff);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId);
#ifdef __cplusplus
}
......
......@@ -89,4 +89,5 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
......@@ -458,9 +458,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_t** readOpt) {
int idx = streamGetInit(cfName);
//*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
if (snapshot != NULL) {
*snapshot = NULL;
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
}
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
......@@ -638,6 +637,39 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
return code;
}
int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
if (end != NULL && strcmp(key, end) > 0) {
break;
}
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
}
} else {
break;
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
......
......@@ -371,7 +371,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
char keyBuf[128] = {0};
char valBuf[64] = {0};
int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, INT64_MIN);
memcpy(keyBuf, taskKey, strlen(taskKey));
sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf));
......@@ -382,7 +382,16 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStateDestroyBatch(batch);
return code;
}
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
const char* taskKey = "streamFileState";
char keyBuf[128] = {0};
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId);
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
}
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
const char* taskKey = "streamFileState";
return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
}
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
const char* taskKey = "streamFileState";
......@@ -391,7 +400,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
char buf[128] = {0};
void* val = NULL;
int32_t len = 0;
sprintf(buf, "%s:%" PRId64 "", taskKey, INT64_MIN);
memcpy(buf, taskKey, strlen(taskKey));
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0) {
return TSDB_CODE_FAILED;
......@@ -412,7 +421,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
sscanf(val, "%" PRId64 "", &ts);
taosMemoryFree(val);
if (ts < pFileState->flushMark) {
// forceRemoveCheckPoint(pFileState->pFileStore, i);
forceRemoveCheckpoint(pFileState, i);
break;
} else {
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册