提交 c27d01a5 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'enh/addTtlToStream' into enh/rocksdbSstateMerge

...@@ -27,21 +27,10 @@ extern "C" { ...@@ -27,21 +27,10 @@ extern "C" {
#ifndef _STREAM_STATE_H_ #ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_ #define _STREAM_STATE_H_
typedef struct { // void* streamBackendInit(const char* path);
rocksdb_t* db; // void streamBackendCleanup(void* arg);
rocksdb_writeoptions_t* writeOpts; // SListNode* streamBackendAddCompare(void* backend, void* arg);
rocksdb_readoptions_t* readOpts; // void streamBackendDelCompare(void* backend, void* arg);
rocksdb_options_t* dbOpt;
void* param;
void* env;
rocksdb_cache_t* cache;
TdThreadMutex mutex;
SList* list;
} SBackendHandle;
void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
typedef struct STdbState { typedef struct STdbState {
...@@ -55,8 +44,9 @@ typedef struct STdbState { ...@@ -55,8 +44,9 @@ typedef struct STdbState {
void* param; void* param;
void* env; void* env;
SListNode* pComparNode; SListNode* pComparNode;
SBackendHandle* pBackendHandle; void* pBackendHandle;
char idstr[48]; char idstr[48];
void* compactFactory;
TDB* db; TDB* db;
TTB* pStateDb; TTB* pStateDb;
...@@ -168,11 +158,11 @@ typedef struct SStateSessionKey { ...@@ -168,11 +158,11 @@ typedef struct SStateSessionKey {
int64_t opNum; int64_t opNum;
} SStateSessionKey; } SStateSessionKey;
typedef struct streamValue { typedef struct SStreamValue {
int64_t unixTimestamp; int64_t unixTimestamp;
int32_t len; int32_t len;
char data[0]; char* data;
} streamValue; } SStreamValue;
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
#define _STREAM_BACKEDN_ROCKSDB_H_ #define _STREAM_BACKEDN_ROCKSDB_H_
#include "executor.h" #include "executor.h"
#include "streamInc.h"
#include "rocksdb/c.h"
// #include "streamInc.h"
#include "streamState.h" #include "streamState.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcommon.h" #include "tcommon.h"
...@@ -28,57 +30,83 @@ typedef struct SCfComparator { ...@@ -28,57 +30,83 @@ typedef struct SCfComparator {
rocksdb_comparator_t** comp; rocksdb_comparator_t** comp;
int32_t numOfComp; int32_t numOfComp;
} SCfComparator; } SCfComparator;
typedef struct {
rocksdb_t* db;
rocksdb_writeoptions_t* writeOpts;
rocksdb_readoptions_t* readOpts;
rocksdb_options_t* dbOpt;
void* param;
void* env;
rocksdb_cache_t* cache;
TdThreadMutex mutex;
rocksdb_compactionfilterfactory_t* filterFactory;
SList* list;
} SBackendHandle;
void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int streamStateOpenBackend(void* backend, SStreamState* pState); int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove); void streamStateCloseBackend(SStreamState* pState, bool remove);
void streamStateDestroyCompar(void* arg); void streamStateDestroyCompar(void* arg);
// void streamStateRemoveBackend(SStreamState* pState);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); // state cf
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear_rocksdb(SStreamState* pState); int32_t streamStateClear_rocksdb(SStreamState* pState);
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
// func cf
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key);
// session cf
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen); int32_t* pVLen);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); // fill cf
int32_t streamStateSessionClear_rocksdb(SStreamState* pState); int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
// parname cf
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
void* streamStateCreateBatch(); void* streamStateCreateBatch();
...@@ -89,10 +117,10 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr ...@@ -89,10 +117,10 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
void* val, int32_t vlen); void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
void* streamDefaultIterCreate_rocksdb(SStreamState* pState); void* streamDefaultIterCreate_rocksdb(SStreamState* pState);
int32_t streamDefaultIterValid_rocksdb(void* iter); int32_t streamDefaultIterValid_rocksdb(void* iter);
...@@ -101,5 +129,13 @@ void streamDefaultIterNext_rocksdb(void* iter); ...@@ -101,5 +129,13 @@ void streamDefaultIterNext_rocksdb(void* iter);
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len);
// batch func
void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif #endif
\ No newline at end of file
...@@ -13,10 +13,145 @@ ...@@ -13,10 +13,145 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rocksdb/c.h" // #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "tcommon.h" #include "tcommon.h"
typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
void destroyCompactFilteFactory(void* arg);
void destroyCompactFilte(void* arg);
const char* compactFilteFactoryName(void* arg);
const char* compactFilteName(void* arg);
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
typedef struct {
void* tableOpt;
} RocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
typedef int (*EncodeFunc)(void* key, char* buf);
typedef int (*DecodeFunc)(void* key, char* buf);
typedef int (*ToStringFunc)(void* key, char* buf);
typedef const char* (*CompareName)(void* statue);
typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
typedef void (*DestroyFunc)(void* state);
typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest);
typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest);
const char* compareDefaultName(void* name);
const char* compareStateName(void* name);
const char* compareWinKeyName(void* name);
const char* compareSessionKeyName(void* name);
const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);
void* streamBackendInit(const char* path) {
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL);
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
rocksdb_env_set_low_priority_background_threads(env, 4);
rocksdb_env_set_high_priority_background_threads(env, 2);
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
rocksdb_options_set_max_total_wal_size(opts, 128 << 20);
rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
pHandle->env = env;
pHandle->dbOpt = opts;
pHandle->cache = cache;
pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
char* err = NULL;
pHandle->db = rocksdb_open(opts, path, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
goto _EXIT;
}
return (void*)pHandle;
_EXIT:
rocksdb_options_destroy(opts);
rocksdb_cache_destroy(cache);
rocksdb_env_destroy(env);
taosThreadMutexDestroy(&pHandle->mutex);
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
free(pHandle);
return NULL;
}
void streamBackendCleanup(void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)arg;
rocksdb_close(pHandle->db);
rocksdb_options_destroy(pHandle->dbOpt);
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
taosThreadMutexDestroy(&pHandle->mutex);
SListNode* head = tdListPopHead(pHandle->list);
while (head != NULL) {
streamStateDestroyCompar(head->data);
taosMemoryFree(head);
head = tdListPopHead(pHandle->list);
}
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
taosMemoryFree(pHandle);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend;
SListNode* node = NULL;
taosThreadMutexLock(&pHandle->mutex);
node = tdListAdd(pHandle->list, arg);
taosThreadMutexUnlock(&pHandle->mutex);
return node;
}
void streamBackendDelCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend;
SListNode* node = NULL;
taosThreadMutexLock(&pHandle->mutex);
node = tdListPopNode(pHandle->list, arg);
taosThreadMutexUnlock(&pHandle->mutex);
if (node) {
streamStateDestroyCompar(node->data);
taosMemoryFree(node);
}
}
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
int streamGetInit(const char* funcName);
// |key|-----value------|
// |key|ttl|len|userData|
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
int ret = memcmp(aBuf, bBuf, aLen); int ret = memcmp(aBuf, bBuf, aLen);
if (ret == 0) { if (ret == 0) {
...@@ -30,6 +165,16 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, ...@@ -30,6 +165,16 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
return ret; return ret;
} }
} }
int streamStateValueIsStale(char* vv) {
int64_t ts = 0;
taosDecodeFixedI64(vv, &ts);
return ts < taosGetTimestampSec() ? 1 : 0;
}
int iterValueIsStale(rocksdb_iterator_t* iter) {
size_t len;
char* v = (char*)rocksdb_iter_value(iter, &len);
return streamStateValueIsStale(v);
}
int defaultKeyEncode(void* k, char* buf) { int defaultKeyEncode(void* k, char* buf) {
int len = strlen((char*)k); int len = strlen((char*)k);
memcpy(buf, (char*)k, len); memcpy(buf, (char*)k, len);
...@@ -290,14 +435,14 @@ int parKeyToString(void* k, char* buf) { ...@@ -290,14 +435,14 @@ int parKeyToString(void* k, char* buf) {
} }
int stremaValueEncode(void* k, char* buf) { int stremaValueEncode(void* k, char* buf) {
int len = 0; int len = 0;
streamValue* key = k; SStreamValue* key = k;
len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp); len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key->len); len += taosEncodeFixedI32((void**)&buf, key->len);
len += taosEncodeBinary((void**)&buf, key->data, key->len); len += taosEncodeBinary((void**)&buf, key->data, key->len);
return len; return len;
} }
int streamValueDecode(void* k, char* buf) { int streamValueDecode(void* k, char* buf) {
streamValue* key = k; SStreamValue* key = k;
char* p = buf; char* p = buf;
p = taosDecodeFixedI64(p, &key->unixTimestamp); p = taosDecodeFixedI64(p, &key->unixTimestamp);
p = taosDecodeFixedI32(p, &key->len); p = taosDecodeFixedI32(p, &key->len);
...@@ -305,44 +450,27 @@ int streamValueDecode(void* k, char* buf) { ...@@ -305,44 +450,27 @@ int streamValueDecode(void* k, char* buf) {
return p - buf; return p - buf;
} }
int32_t streamValueToString(void* k, char* buf) { int32_t streamValueToString(void* k, char* buf) {
streamValue* key = k; SStreamValue* key = k;
int n = 0; int n = 0;
n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp); n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
n += sprintf(buf + n, "len:%d,", key->len); n += sprintf(buf + n, "len:%d,", key->len);
n += sprintf(buf + n, "data:%s]", key->data); n += sprintf(buf + n, "data:%s]", key->data);
return n; return n;
} }
/*1: stale, 0: no stale*/
/*1: stale, 0: no stale*/
int32_t streaValueIsStale(void* k, int64_t ts) { int32_t streaValueIsStale(void* k, int64_t ts) {
streamValue* key = k; SStreamValue* key = k;
if (key->unixTimestamp < ts) { if (key->unixTimestamp < ts) {
return 1; return 1;
} }
return 0; return 0;
} }
typedef struct { void destroyFunc(void* arg) {
void* tableOpt; (void)arg;
} RocksdbCfParam; return;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; }
typedef int (*EncodeFunc)(void* key, char* buf);
typedef int (*DecodeFunc)(void* key, char* buf);
typedef int (*ToStringFunc)(void* key, char* buf);
typedef const char* (*CompareName)(void* statue);
typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
typedef void (*DestroyFunc)(void* state);
const char* compareDefaultName(void* name);
const char* compareStateName(void* name);
const char* compareWinKeyName(void* name);
const char* compareSessionKeyName(void* name);
const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);
void destroyFunc(void* stata) { return; }
typedef struct { typedef struct {
const char* key; const char* key;
...@@ -354,30 +482,126 @@ typedef struct { ...@@ -354,30 +482,126 @@ typedef struct {
ToStringFunc toStrFunc; ToStringFunc toStrFunc;
CompareName cmpName; CompareName cmpName;
DestroyFunc detroyFunc; DestroyFunc detroyFunc;
EncodeValueFunc enValueFunc;
DecodeValueFunc deValueFunc;
} SCfInit; } SCfInit;
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX)); #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX));
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len);
char* buf = p;
int32_t len = 0;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len);
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
*dest = p;
return len;
}
/*
* ret >= 0 : found valid value
* ret < 0 : error or timeout
*/
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0};
char* p = value;
int64_t now = taosGetTimestampMs();
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
if (key.unixTimestamp != 0 && key.unixTimestamp < now) {
taosMemoryFree(key.data);
*dest = NULL;
return -1;
}
if (ttl != NULL) {
*ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
}
if (dest != NULL) {
*dest = key.data;
} else {
taosMemoryFree(key.data);
}
return key.len;
}
SCfInit ginitDict[] = { SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc}, destroyFunc, encodeValueFunc, decodeValueFunc},
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc}, {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc}, encodeValueFunc, decodeValueFunc},
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
compareSessionKeyName, destroyFunc}, compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc}, {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc}, encodeValueFunc, decodeValueFunc},
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc}, {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
}; };
const char* compareDefaultName(void* name) { return ginitDict[0].key; } const char* compareDefaultName(void* arg) {
const char* compareStateName(void* name) { return ginitDict[1].key; } (void)arg;
const char* compareWinKeyName(void* name) { return ginitDict[2].key; } return ginitDict[0].key;
const char* compareSessionKeyName(void* name) { return ginitDict[3].key; } }
const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } const char* compareStateName(void* arg) {
const char* compareParKeyName(void* name) { return ginitDict[5].key; } (void)arg;
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } return ginitDict[1].key;
}
const char* compareWinKeyName(void* arg) {
(void)arg;
return ginitDict[2].key;
}
const char* compareSessionKeyName(void* arg) {
(void)arg;
return ginitDict[3].key;
}
const char* compareFuncKeyName(void* arg) {
(void)arg;
return ginitDict[4].key;
}
const char* compareParKeyName(void* arg) {
(void)arg;
return ginitDict[5].key;
}
const char* comparePartagKeyName(void* arg) {
(void)arg;
return ginitDict[6].key;
}
void destroyCompactFilteFactory(void* arg) {
if (arg == NULL) return;
}
const char* compactFilteFactoryName(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter";
}
void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
int64_t unixTime = taosGetTimestampMs();
SStreamValue value;
memset(&value, 0, sizeof(value));
streamValueDecode(&value, (char*)val);
taosMemoryFree(value.data);
if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
return 1;
}
return 0;
}
const char* compactFilteName(void* arg) { return "stream_filte"; }
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter =
rocksdb_compactionfilter_create(NULL, destroyCompactFilte, compactFilte, compactFilteName);
return filter;
}
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId); qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId);
...@@ -390,7 +614,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { ...@@ -390,7 +614,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
cfOpt[i] = rocksdb_options_create(); cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt);
// refactor later // refactor later
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
...@@ -436,6 +660,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { ...@@ -436,6 +660,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
return 0; return 0;
} }
void streamStateCloseBackend(SStreamState* pState, bool remove) { void streamStateCloseBackend(SStreamState* pState, bool remove) {
char* status[] = {"close", "drop"}; char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId); qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId);
...@@ -494,6 +719,7 @@ void streamStateDestroyCompar(void* arg) { ...@@ -494,6 +719,7 @@ void streamStateDestroyCompar(void* arg) {
} }
taosMemoryFree(comp->comp); taosMemoryFree(comp->comp);
} }
int streamGetInit(const char* funcName) { int streamGetInit(const char* funcName) {
size_t len = strlen(funcName); size_t len = strlen(funcName);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
...@@ -520,12 +746,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -520,12 +746,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
if (snapshot != NULL) { if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
} }
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt; *readOpt = rOpt;
rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0); rocksdb_readoptions_set_fill_cache(rOpt, 0);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
...@@ -538,7 +762,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -538,7 +762,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(funcname); \ int i = streamGetInit(funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \ code = -1; \
break; \ break; \
} \ } \
...@@ -548,16 +772,17 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -548,16 +772,17 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \ char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \ if (err != NULL) { \
taosMemoryFree(err); \ taosMemoryFree(err); \
qDebug("streamState str: %s failed to write to %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
err); \
code = -1; \ code = -1; \
} else { \ } else { \
qDebug("streamState str:%s succ to write to %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
vLen); \
} \ } \
taosMemoryFree(ttlV); \
} while (0); } while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
...@@ -567,7 +792,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -567,7 +792,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(funcname); \ int i = streamGetInit(funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \ code = -1; \
break; \ break; \
} \ } \
...@@ -580,22 +805,32 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -580,22 +805,32 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
size_t len = 0; \ size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \ if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
funcname); \
if (err != NULL) taosMemoryFree(err); \ if (err != NULL) taosMemoryFree(err); \
code = -1; \ code = -1; \
} else { \ } else { \
if (pVal != NULL) *pVal = val; \ char * p = NULL, *end = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
if (len < 0) { \
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
} \
if (pVal != NULL) { \
*pVal = p; \
} else { \
taosMemoryFree(p); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = len; \ if (vLen != NULL) *vLen = len; \
} \ } \
if (err != NULL) { \ if (err != NULL) { \
taosMemoryFree(err); \ taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
err); \
code = -1; \ code = -1; \
} else { \ } else { \
if (code == 0) \ if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} \ } \
} while (0); } while (0);
...@@ -627,180 +862,26 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -627,180 +862,26 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
} \ } \
} while (0); } while (0);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { // state cf
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, value, vLen);
return code;
}
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
return 0;
}
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
return 0;
}
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0; int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, value, vLen); STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen);
return code;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
}
return 0;
}
void* streamStateCreateBatch() {
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
return pBatch;
}
int32_t streamStateGetBatchSize(void* pBatch) {
if (pBatch == NULL) return -1;
return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch);
}
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) {
int i = streamGetInit(cfName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName);
return -1;
}
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen);
return 0;
}
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "default", &key);
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 0 : -1;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
rocksdb_iter_seek(pCur->iter, key, strlen(key));
}
void streamDefaultIterNext_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
rocksdb_iter_next(pCur->iter);
}
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len);
}
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
if (end != NULL && strcmp(key, end) > 0) {
break;
}
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
}
} else {
break;
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code; return code;
} }
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0; int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen);
return code; return code;
} }
// todo refactor
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0; int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey); STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
return code; return code;
} }
// todo refactor
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
return code;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
return code;
}
// todo refactor
int32_t streamStateClear_rocksdb(SStreamState* pState) { int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
...@@ -822,131 +903,137 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { ...@@ -822,131 +903,137 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
char* err = NULL; char* err = NULL;
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
sKeyStr, sLen, eKeyStr, eLen, &err); sKeyStr, sLen, eKeyStr, eLen, &err);
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen); // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if (err != NULL) { if (err != NULL) {
qWarn("failed to delete range cf(state) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); qWarn(
"failed to delete range cf(state) err: %s, "
"start: %s, end:%s",
err, toStringStart, toStringEnd);
taosMemoryFree(err); taosMemoryFree(err);
} }
return 0; return 0;
} }
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { if (!pCur) {
int code = 0; return -1;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
} }
rocksdb_iter_next(pCur->iter);
return 0;
}
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp);
return code; return code;
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0}; int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; qDebug("streamStateGetGroupKVByCur_rocksdb");
int len = stateSessionKeyEncode(&sKey, buf); if (!pCur) {
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { return -1;
streamStateFreeCur(pCur);
return NULL;
} }
uint64_t groupId = pKey->groupId;
int32_t c = 0; int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
size_t klen; if (code == 0) {
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); if (pKey->groupId == groupId) {
SStateSessionKey curKey = {0}; return 0;
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev %s", toString);
streamStateFreeCur(pCur);
return NULL;
} }
return pCur; }
return -1;
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); qDebug("streamStateAddIfNotExist_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); int32_t size = *pVLen;
if (pCur == NULL) { if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
return NULL; return 0;
} }
pCur->db = pState->pTdbState->rocksdb; *pVal = taosMemoryMalloc(size);
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); memset(*pVal, 0, size);
pCur->number = pState->number; return 0;
}
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
char buf[128] = {0}; rocksdb_iter_prev(pCur->iter);
return 0;
}
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1;
SStateKey tkey;
SStateKey* pKtmp = &tkey;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
int len = stateSessionKeyEncode(&sKey, buf); size_t tlen;
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
streamStateFreeCur(pCur); stateKeyDecode((void*)pKtmp, keyStr);
return NULL; if (pKtmp->opNum != pCur->number) {
return -1;
} }
size_t klen; size_t vlen = 0;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen);
SStateSessionKey curKey = {0}; if (pVLen != NULL) *pVLen = vlen;
stateSessionKeyDecode(&curKey, (char*)iKey); *pKey = pKtmp->key;
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; return 0;
}
rocksdb_iter_next(pCur->iter); return -1;
if (!rocksdb_iter_valid(pCur->iter)) { }
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
if (code == 0) return pCur;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL;
} }
return pCur; return NULL;
} }
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb"); qDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number; pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0}; char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf); int len = stateKeyEncode((void*)&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
// skip ttl expired data
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_next(pCur->iter);
}
size_t klen; if (rocksdb_iter_valid(pCur->iter)) {
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateKey curKey;
SStateSessionKey curKey = {0}; size_t kLen;
stateSessionKeyDecode(&curKey, (char*)iKey); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
return pCur;
}
rocksdb_iter_next(pCur->iter); rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) { return pCur;
}
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
}
return pCur;
} }
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateAddIfNotExist_rocksdb");
int32_t size = *pVLen;
if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
return 0;
}
*pVal = taosMemoryMalloc(size);
memset(*pVal, 0, size);
return 0;
}
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
int32_t code = 0; int32_t code = 0;
...@@ -960,11 +1047,20 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK ...@@ -960,11 +1047,20 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
pCur->db = pState->pTdbState->rocksdb; pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
}
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
pCur = NULL;
}
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur; return pCur;
} }
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
...@@ -978,7 +1074,11 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* ...@@ -978,7 +1074,11 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
int len = stateKeyEncode((void*)&sKey, buf); int len = stateKeyEncode((void*)&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, len); rocksdb_iter_seek(pCur->iter, buf, len);
if (rocksdb_iter_valid(pCur->iter)) {
if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
size_t vlen;
char* val = (char*)rocksdb_iter_value(pCur->iter, &vlen);
if (!streamStateValueIsStale(val)) {
SStateKey curKey; SStateKey curKey;
size_t kLen = 0; size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
...@@ -989,120 +1089,176 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* ...@@ -989,120 +1089,176 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
return pCur; return pCur;
} }
} }
}
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { // func cf
qDebug("streamStateGetAndCheckCur_rocksdb"); int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); int code = 0;
if (pCur) { STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); return code;
if (code == 0) return pCur; }
streamStateFreeCur(pCur); int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
return 0;
}
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
return 0;
}
// session cf
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
} }
return NULL; return code;
} }
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateGetKVByCur_rocksdb"); qDebug("streamStateSessionGet_rocksdb");
if (!pCur) return -1; int code = 0;
SStateKey tkey; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SStateKey* pKtmp = &tkey; SSessionKey resKey = *key;
void* tmp = NULL;
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (rocksdb_iter_valid(pCur->iter)) { if (key->win.skey != resKey.win.skey) {
size_t tlen; code = -1;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); } else {
stateKeyDecode((void*)pKtmp, keyStr); *key = resKey;
if (pKtmp->opNum != pCur->number) { *pVal = taosMemoryCalloc(1, *pVLen);
return -1; memcpy(*pVal, tmp, *pVLen);
} }
size_t vlen = 0;
if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen);
if (pVLen != NULL) *pVLen = vlen;
*pKey = pKtmp->key;
return 0;
} }
return -1; streamStateFreeCur(pCur);
// impl later
return code;
} }
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
return code;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb; pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0}; char buf[128] = {0};
int len = winKeyEncode((void*)key, buf); SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
}
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (rocksdb_iter_valid(pCur->iter)) { int32_t c = 0;
size_t kLen; size_t klen;
SWinKey curKey; const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); SStateSessionKey curKey = {0};
winKeyDecode((void*)&curKey, keyStr); stateSessionKeyDecode(&curKey, (char*)iKey);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
return pCur;
}
}
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev
// %s", toString);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
}
return pCur;
} }
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateFillGetKVByCur_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
if (!pCur) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
return -1; if (pCur == NULL) {
return NULL;
} }
SWinKey winKey; pCur->db = pState->pTdbState->rocksdb;
if (rocksdb_iter_valid(pCur->iter)) { pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
size_t tlen; pCur->number = pState->number;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
winKeyDecode(&winKey, keyStr);
size_t vlen = 0; char buf[128] = {0};
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
if (pVal != NULL) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char* dst = taosMemoryCalloc(1, vlen); int len = stateSessionKeyEncode(&sKey, buf);
memcpy(dst, valStr, vlen); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
*pVal = dst; streamStateFreeCur(pCur);
return NULL;
} }
if (pVLen != NULL) *pVLen = vlen; if (iterValueIsStale(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
} else { rocksdb_iter_next(pCur->iter);
return -1; if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
} }
*pKey = winKey; return pCur;
return 0;
} }
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateGetGroupKVByCur_rocksdb"); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
if (!pCur) { qDebug("streamStateSessionSeekKeyNext_rocksdb");
return -1; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
} }
uint64_t groupId = pKey->groupId; pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
if (code == 0) {
if (pKey->groupId == groupId) { char buf[128] = {0};
return 0; int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
} }
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
} }
return -1; size_t klen;
} const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { SStateSessionKey curKey = {0};
qDebug("streamStateGetFirst_rocksdb"); stateSessionKeyDecode(&curKey, (char*)iKey);
SWinKey tmp = {.ts = 0, .groupId = 0}; if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); rocksdb_iter_next(pCur->iter);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp); return NULL;
return code; }
return pCur;
} }
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGetKVByCur_rocksdb"); qDebug("streamStateSessionGetKVByCur_rocksdb");
...@@ -1112,18 +1268,21 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* ...@@ -1112,18 +1268,21 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
SStateSessionKey ktmp = {0}; SStateSessionKey ktmp = {0};
size_t kLen = 0, vLen = 0; size_t kLen = 0, vLen = 0;
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1; return -1;
} }
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
stateSessionKeyDecode((void*)&ktmp, (char*)curKey); stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
SStateSessionKey* pKTmp = &ktmp; SStateSessionKey* pKTmp = &ktmp;
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
if (pVal != NULL) { char* val = NULL;
*pVal = (char*)val; int32_t len = decodeValueFunc((void*)vval, vLen, NULL, &val);
if (len < 0) {
return -1;
} }
if (pVLen != NULL) *pVLen = vLen; if (pVal != NULL) *pVal = (char*)val;
if (pVLen != NULL) *pVLen = len;
if (pKTmp->opNum != pCur->number) { if (pKTmp->opNum != pCur->number) {
return -1; return -1;
...@@ -1134,39 +1293,86 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* ...@@ -1134,39 +1293,86 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
*pKey = pKTmp->key; *pKey = pKTmp->key;
return 0; return 0;
} }
// fill cf
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0;
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
qDebug("streamStateSeekKeyNext_rocksdb"); return code;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
return code;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL; if (pCur == NULL) return NULL;
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb; pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0}; char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf); int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (iterValueIsStale(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
SStateKey curKey;
size_t kLen; size_t kLen;
SWinKey curKey;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr); winKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
return pCur; return pCur;
} }
rocksdb_iter_next(pCur->iter);
return pCur;
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}
SWinKey winKey;
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1;
}
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
winKeyDecode(&winKey, keyStr);
size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
char* dst = NULL;
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
if (len < 0) {
return -1;
}
if (pVal != NULL) *pVal = (char*)dst;
if (pVLen != NULL) *pVLen = vlen;
*pKey = winKey;
return 0;
}
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb"); qDebug("streamStateFillSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
...@@ -1183,8 +1389,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const ...@@ -1183,8 +1389,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
// skip stale data
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_next(pCur->iter);
}
{ if (rocksdb_iter_valid(pCur->iter)) {
SWinKey curKey; SWinKey curKey;
size_t kLen = 0; size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
...@@ -1214,8 +1424,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const ...@@ -1214,8 +1424,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter);
}
{ if (rocksdb_iter_valid(pCur->iter)) {
SWinKey curKey; SWinKey curKey;
size_t kLen = 0; size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
...@@ -1226,23 +1439,10 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const ...@@ -1226,23 +1439,10 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
return pCur; return pCur;
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
return 0;
}
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
if (!pCur) {
return -1;
}
rocksdb_iter_next(pCur->iter);
return 0;
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb"); qDebug("streamStateSessionGetKeyByRange_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
...@@ -1299,36 +1499,6 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes ...@@ -1299,36 +1499,6 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
return -1; return -1;
} }
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGet_rocksdb");
int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SSessionKey resKey = *key;
void* tmp = NULL;
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (key->win.skey != resKey.win.skey) {
code = -1;
} else {
*key = resKey;
*pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
}
streamStateFreeCur(pCur);
// impl later
return code;
}
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
return code;
}
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) { int32_t* pVLen) {
qDebug("streamStateSessionAddIfNotExist_rocksdb"); qDebug("streamStateSessionAddIfNotExist_rocksdb");
...@@ -1379,6 +1549,27 @@ _end: ...@@ -1379,6 +1549,27 @@ _end:
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
} }
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
qDebug("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
break;
}
streamStateCurNext_rocksdb(pState, pCur);
}
streamStateFreeCur(pCur);
return -1;
}
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
qDebug("streamStateStateAddIfNotExist_rocksdb"); qDebug("streamStateStateAddIfNotExist_rocksdb");
...@@ -1436,27 +1627,7 @@ _end: ...@@ -1436,27 +1627,7 @@ _end:
return res; return res;
} }
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { // partag cf
qDebug("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
break;
}
streamStateCurNext_rocksdb(pState, pCur);
}
streamStateFreeCur(pCur);
return -1;
}
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
int code = 0; int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
...@@ -1468,10 +1639,10 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void ...@@ -1468,10 +1639,10 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen);
return code; return code;
} }
// parname cfg
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
int code = 0; int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, tbname, TSDB_TABLE_NAME_LEN); STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
return code; return code;
} }
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
...@@ -1481,7 +1652,135 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi ...@@ -1481,7 +1652,135 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
return code; return code;
} }
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
// only close db int code = 0;
streamStateCloseBackend(pState, remove); STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "default", &key);
return code;
}
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
int32_t vlen = 0;
const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
char* val = NULL;
int32_t len = decodeValueFunc((void*)vval, vlen, NULL, NULL);
if (len < 0) {
rocksdb_iter_next(pIter);
continue;
}
if (end != NULL && strcmp(key, end) > 0) {
break;
}
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
}
} else {
break;
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 1 : 0;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
rocksdb_iter_seek(pCur->iter, key, strlen(key));
}
void streamDefaultIterNext_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
rocksdb_iter_next(pCur->iter);
}
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
int32_t vlen = 0;
char* dst = NULL;
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) {
return NULL;
}
return dst;
}
// batch func
void* streamStateCreateBatch() {
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
return pBatch;
}
int32_t streamStateGetBatchSize(void* pBatch) {
if (pBatch == NULL) return 0;
return rocksdb_writebatch_count(pBatch);
}
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) {
int i = streamGetInit(cfName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName);
return -1;
}
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
return 0;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
}
return 0;
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "executor.h" #include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInc.h" #include "streamInc.h"
#include "ttimer.h" #include "ttimer.h"
......
...@@ -26,88 +26,6 @@ ...@@ -26,88 +26,6 @@
#define MAX_TABLE_NAME_NUM 100000 #define MAX_TABLE_NAME_NUM 100000
void* streamBackendInit(const char* path) {
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL);
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
rocksdb_env_set_low_priority_background_threads(env, 4);
rocksdb_env_set_high_priority_background_threads(env, 2);
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
rocksdb_options_set_max_total_wal_size(opts, 128 << 20);
rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
pHandle->env = env;
pHandle->dbOpt = opts;
pHandle->cache = cache;
char* err = NULL;
pHandle->db = rocksdb_open(opts, path, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
goto _EXIT;
}
return pHandle;
_EXIT:
rocksdb_options_destroy(opts);
rocksdb_cache_destroy(cache);
rocksdb_env_destroy(env);
taosThreadMutexDestroy(&pHandle->mutex);
tdListFree(pHandle->list);
free(pHandle);
return NULL;
}
void streamBackendCleanup(void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)arg;
rocksdb_close(pHandle->db);
rocksdb_options_destroy(pHandle->dbOpt);
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
taosThreadMutexDestroy(&pHandle->mutex);
SListNode* head = tdListPopHead(pHandle->list);
while (head != NULL) {
streamStateDestroyCompar(head->data);
taosMemoryFree(head);
head = tdListPopHead(pHandle->list);
}
tdListFree(pHandle->list);
taosMemoryFree(pHandle);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend;
SListNode* node = NULL;
taosThreadMutexLock(&pHandle->mutex);
node = tdListAdd(pHandle->list, arg);
taosThreadMutexUnlock(&pHandle->mutex);
return node;
}
void streamBackendDelCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend;
SListNode* node = NULL;
taosThreadMutexLock(&pHandle->mutex);
node = tdListPopNode(pHandle->list, arg);
taosThreadMutexUnlock(&pHandle->mutex);
if (node) {
streamStateDestroyCompar(node->data);
taosMemoryFree(node);
}
}
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) { if (pWin1->groupId > pWin2->groupId) {
return 1; return 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册