提交 ff8cb09f 编写于 作者: dengyihao's avatar dengyihao

refactor backend

上级 95d81604
......@@ -133,6 +133,10 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
/***compare func **/
typedef struct SStateChekpoint {
char* taskName;
int64_t checkpointId;
} SStateChekpoint;
// todo refactor
typedef struct SStateKey {
SWinKey key;
......
......@@ -85,4 +85,8 @@ void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
#endif
\ No newline at end of file
......@@ -16,6 +16,25 @@
#include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "tlog.h"
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
//
return memcmp(aBuf, bBuf, aLen);
}
int defaultKeyEncode(void* k, char* buf) {
int len = strlen((char*)k);
memcpy(buf, (char*)k, len);
return len;
}
int defaultKeyDecode(void* k, char* buf) {
int len = strlen(buf);
memcpy(k, buf, len);
return len;
}
int defaultKeyToString(void* k, char* buf) {
// just to debug
return sprintf(buf, "key: %s", (char*)k);
}
//
// SStateKey
// |--groupid--|---ts------|--opNum----|
......@@ -261,37 +280,56 @@ int parKeyToString(void* k, char* buf) {
return n;
}
const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"};
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
typedef int (*EncodeFunc)(void* key, char* buf);
typedef int (*DecodeFunc)(void* key, char* buf);
typedef int (*ToStringFunc)(void* key, char* buf);
////typedef int32_t (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
////typedef const char* (*BackendCmpNameFunc)(void* statue);
typedef const char* (*CompareName)(void* statue);
typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
typedef void (*DestroyFunc)(void* state);
const char* compareDefaultName(void* name);
const char* compareStateName(void* name);
const char* compareWinKeyName(void* name);
const char* compareSessionKeyName(void* name);
const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);
typedef struct {
const char* key;
int32_t len;
int idx;
BackendCmpFunc cmpFunc;
EncodeFunc enFunc;
DecodeFunc deFunc;
ToStringFunc toStrFunc;
CompareName cmpName;
DestroyFunc detroyFunc;
} SCfInit;
SCfInit ginitDict[] = {
{"default", strlen("default"), 0, stateKeyEncode, stateKeyDecode, stateKeyToString},
{"fill", strlen("fill"), 1, winKeyEncode, winKeyDecode, winKeyToString},
{"sess", strlen("sess"), 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString},
{"func", strlen("func"), 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString},
{"parname", strlen("parname"), 4, parKeyEncode, parKeyDecode, parKeyToString},
{"partag", strlen("partag"), 5, parKeyEncode, parKeyDecode, parKeyToString},
{"default", strlen("default"), 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString,
compareDefaultName},
{"state", strlen("state"), 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName},
{"fill", strlen("fill"), 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName},
{"sess", strlen("sess"), 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode,
stateSessionKeyToString, compareSessionKeyName},
{"func", strlen("func"), 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName},
{"parname", strlen("parname"), 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName},
{"partag", strlen("partag"), 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName},
};
const char* compareStateName(void* name) { return cfName[0]; }
const char* compareWinKeyName(void* name) { return cfName[1]; }
const char* compareSessionKey(void* name) { return cfName[2]; }
const char* compareFuncKey(void* name) { return cfName[3]; }
const char* compareParKey(void* name) { return cfName[4]; }
const char* comparePartagKey(void* name) { return cfName[5]; }
const char* compareDefaultName(void* name) { return ginitDict[0].key; }
const char* compareStateName(void* name) { return ginitDict[1].key; }
const char* compareWinKeyName(void* name) { return ginitDict[2].key; }
const char* compareSessionKeyName(void* name) { return ginitDict[3].key; }
const char* compareFuncKeyName(void* name) { return ginitDict[4].key; }
const char* compareParKeyName(void* name) { return ginitDict[5].key; }
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; }
void destroyFunc(void* stata) { return; }
int streamInitBackend(SStreamState* pState, char* path) {
......@@ -309,11 +347,12 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
char* err = NULL;
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
for (int i = 0; i < cfLen; i++) {
cfOpt[i] = rocksdb_options_create_copy(opts);
// refactor later
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
rocksdb_block_based_options_set_block_cache(tableOpt, cache);
......@@ -328,31 +367,12 @@ int streamInitBackend(SStreamState* pState, char* path) {
};
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, destroyFunc, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[0], stateCompare);
pCompare[0] = stateCompare;
rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, destroyFunc, winKeyDBComp, compareWinKeyName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare);
pCompare[1] = fillCompare;
rocksdb_comparator_t* sessCompare =
rocksdb_comparator_create(NULL, destroyFunc, stateSessionKeyDBComp, compareSessionKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare);
pCompare[2] = sessCompare;
rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, destroyFunc, tupleKeyDBComp, compareFuncKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare);
pCompare[3] = funcCompare;
rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, compareParKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare);
pCompare[4] = parnameCompare;
rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, comparePartagKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare);
pCompare[5] = partagCompare;
for (int i = 0; i < cfLen; i++) {
SCfInit* cf = &ginitDict[i];
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
pCompare[i] = compare;
}
rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
rocksdb_t* db = rocksdb_open_column_families(opts, path, cfLen, cfName, cfOpt, cfHandle, &err);
......@@ -373,7 +393,7 @@ void streamCleanBackend(SStreamState* pState) {
qInfo("rocksdb already free");
return;
}
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
......@@ -540,7 +560,7 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "default", &sKey, value, vLen);
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, value, vLen);
return code;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
......@@ -581,17 +601,34 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen);
return 0;
}
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "default", &key);
return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_GET_ROCKSDB(pState, "default", &sKey, pVal, pVLen);
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen);
return code;
}
// todo refactor
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "default", &sKey);
STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
return code;
}
......@@ -638,7 +675,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
sKeyStr, sLen, eKeyStr, eLen, &err);
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen);
if (err != NULL) {
qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd);
qWarn("failed to delete range cf(state) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd);
taosMemoryFree(err);
}
......@@ -653,7 +690,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// streamStatePut_rocksdb(pState, &key, s, strlen(s));
// rocksdb_readoptions_t* opt = NULL;
// rocksdb_iterator_t* iter = streamStateIterCreate(pState, "default", NULL, &opt);
// rocksdb_iterator_t* iter = streamStateIterCreate(pState, "state", NULL, &opt);
// rocksdb_iter_seek(iter, buf, sLen);
// char* err = NULL;
......@@ -804,7 +841,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
......@@ -973,7 +1010,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
......
......@@ -57,7 +57,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState) {
goto _error;
}
pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......@@ -342,12 +342,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, &sKey, pPos->pRowBuff, pFileState->rowSize);
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize);
}
if (streamStateGetBatchSize(batch) > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
streamStateDestroyBatch(batch);
if (flushState) {
int32_t len = 0;
......@@ -357,6 +356,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len);
taosMemoryFree(buff);
}
streamStateDestroyBatch(batch);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册