提交 3abdbec8 编写于 作者: dengyihao's avatar dengyihao

add ttl to stream state key

上级 ae3730c3
......@@ -26,67 +26,65 @@
int streamInitBackend(SStreamState* pState, char* path);
void streamCleanBackend(SStreamState* pState);
void streamStateDestroy_rocksdb(SStreamState* pState);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key);
// state cf
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear_rocksdb(SStreamState* pState);
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
// func cf
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key);
// session cf
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
// fill cf
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
// parname cf
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateDestroy_rocksdb(SStreamState* pState);
void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
void* streamDefaultIterCreate_rocksdb(SStreamState* pState);
int32_t streamDefaultIterValid_rocksdb(void* iter);
......@@ -95,5 +93,13 @@ void streamDefaultIterNext_rocksdb(void* iter);
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len);
// batch func
void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
......@@ -17,6 +17,15 @@
#include "streamBackendRocksdb.h"
#include "tcommon.h"
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
int streamGetInit(const char* funcName);
// |key|-----value------|
// |key|ttl|len|userData|
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
int ret = memcmp(aBuf, bBuf, aLen);
if (ret == 0) {
......@@ -312,8 +321,8 @@ int32_t streamValueToString(void* k, char* buf) {
n += sprintf(buf + n, "data:%s]", key->data);
return n;
}
/*1: stale, 0: no stale*/
/*1: stale, 0: no stale*/
int32_t streaValueIsStale(void* k, int64_t ts) {
SStreamValue* key = k;
if (key->unixTimestamp < ts) {
......@@ -325,8 +334,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) {
typedef struct {
void* tableOpt;
void* lru; // global or not
void* filteFactory;
} rocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
......@@ -363,6 +370,7 @@ typedef struct {
DecodeValueFunc deValueFunc;
} SCfInit;
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
......@@ -375,15 +383,30 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
*dest = p;
return len;
}
/*
* ret >= 0 : found valid value
* ret < 0 : error or timeout
*/
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0};
char* p = value;
int64_t now = taosGetTimestampMs();
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
*ttl = key.unixTimestamp;
*dest = key.data;
if (key.unixTimestamp != 0 && key.unixTimestamp < now) {
taosMemoryFree(key.data);
*dest = NULL;
return -1;
}
if (ttl != NULL) {
*ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
}
if (dest != NULL) {
*dest = key.data;
} else {
taosMemoryFree(key.data);
}
return key.len;
}
SCfInit ginitDict[] = {
......@@ -550,6 +573,10 @@ void streamCleanBackend(SStreamState* pState) {
pState->pTdbState->rocksdb = NULL;
}
void streamStateDestroy_rocksdb(SStreamState* pState) {
// only close db
streamCleanBackend(pState);
}
int streamGetInit(const char* funcName) {
size_t len = strlen(funcName);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
......@@ -617,47 +644,52 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \
} else { \
char * p = NULL, *end = NULL; \
int64_t ttl; \
int32_t vlen = ginitDict[i].deValueFunc(val, len, &ttl, &p); \
if (pVal != NULL) { \
*pVal = p; \
} else { \
taosMemoryFree(p); \
} \
if (vLen != NULL) *vLen = vlen; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \
} else { \
char * p = NULL, *end = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
if (len < 0) { \
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
} \
if (pVal != NULL) { \
*pVal = p; \
} else { \
taosMemoryFree(p); \
} \
if (vLen != NULL) *vLen = len; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
......@@ -687,22 +719,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
} \
} while (0);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
return code;
}
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
return 0;
}
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
return 0;
}
// state cf
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0;
......@@ -710,203 +727,18 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen);
return code;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
}
return 0;
}
void* streamStateCreateBatch() {
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
return pBatch;
}
int32_t streamStateGetBatchSize(void* pBatch) {
if (pBatch == NULL) return -1;
return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch);
}
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) {
int i = streamGetInit(cfName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName);
return -1;
}
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
return 0;
}
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "default", &key);
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 0 : -1;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
rocksdb_iter_seek(pCur->iter, key, strlen(key));
}
void streamDefaultIterNext_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
rocksdb_iter_next(pCur->iter);
}
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len);
}
// typedef struct {
// char* start;
// char* end;
// void* result;
// } StreamFilterArg;
// typedef int (*streamfilterFunc)(StreamFilterArg* arg);
// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) {
// int code = 0;
// char* err = NULL;
// rocksdb_snapshot_t* snapshot = NULL;
// rocksdb_readoptions_t* readopts = NULL;
// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
// if (pIter == NULL) {
// return -1;
// }
// char* start = arg->start;
// char* end = arg->end;
// SArray* result = arg->result;
// rocksdb_iter_seek(pIter, start, strlen(start));
// while (rocksdb_iter_valid(pIter)) {
// const char* key = rocksdb_iter_key(pIter, NULL);
// if (end != NULL && strcmp(key, end) > 0) {
// break;
// }
// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
// int64_t checkPoint = 0;
// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
// // taosArrayPush(result, &checkPoint);
// // }
// } else {
// break;
// }
// rocksdb_iter_next(pIter);
// }
// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
// rocksdb_readoptions_destroy(readopts);
// rocksdb_iter_destroy(pIter);
// return code;
// }
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
if (end != NULL && strcmp(key, end) > 0) {
break;
}
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
}
} else {
break;
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen);
return code;
}
// todo refactor
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
return code;
}
// todo refactor
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
return code;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
return code;
}
// todo refactor
int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb");
......@@ -976,124 +808,121 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
return 0;
}
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
if (!pCur) {
return -1;
}
rocksdb_iter_next(pCur->iter);
return 0;
}
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp);
return code;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateGetGroupKVByCur_rocksdb");
if (!pCur) {
return -1;
}
uint64_t groupId = pKey->groupId;
int32_t c = 0;
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev
// %s", toString);
streamStateFreeCur(pCur);
return NULL;
int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == 0) {
if (pKey->groupId == groupId) {
return 0;
}
}
return pCur;
return -1;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateAddIfNotExist_rocksdb");
int32_t size = *pVLen;
if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
return 0;
}
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
*pVal = taosMemoryMalloc(size);
memset(*pVal, 0, size);
return 0;
}
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
char buf[128] = {0};
rocksdb_iter_prev(pCur->iter);
return 0;
}
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1;
SStateKey tkey;
SStateKey* pKtmp = &tkey;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
if (rocksdb_iter_valid(pCur->iter)) {
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
stateKeyDecode((void*)pKtmp, keyStr);
if (pKtmp->opNum != pCur->number) {
return -1;
}
size_t vlen = 0;
if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen);
if (pVLen != NULL) *pVLen = vlen;
*pKey = pKtmp->key;
return 0;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
return -1;
}
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
if (code == 0) return pCur;
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
return NULL;
}
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
if (rocksdb_iter_valid(pCur->iter)) {
SStateKey curKey;
size_t kLen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
return pCur;
}
rocksdb_iter_next(pCur->iter);
return pCur;
}
return pCur;
streamStateFreeCur(pCur);
return NULL;
}
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateAddIfNotExist_rocksdb");
int32_t size = *pVLen;
if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
return 0;
}
*pVal = taosMemoryMalloc(size);
memset(*pVal, 0, size);
return 0;
}
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb");
int32_t code = 0;
......@@ -1112,6 +941,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur;
}
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......@@ -1140,116 +970,159 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
return NULL;
}
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
if (code == 0) return pCur;
streamStateFreeCur(pCur);
// func cf
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
return code;
}
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen);
return 0;
}
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
return 0;
}
// session cf
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
}
return NULL;
return code;
}
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1;
SStateKey tkey;
SStateKey* pKtmp = &tkey;
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGet_rocksdb");
int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SSessionKey resKey = *key;
void* tmp = NULL;
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (rocksdb_iter_valid(pCur->iter)) {
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
stateKeyDecode((void*)pKtmp, keyStr);
if (pKtmp->opNum != pCur->number) {
return -1;
if (key->win.skey != resKey.win.skey) {
code = -1;
} else {
*key = resKey;
*pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
size_t vlen = 0;
if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen);
if (pVLen != NULL) *pVLen = vlen;
*pKey = pKtmp->key;
return 0;
}
return -1;
streamStateFreeCur(pCur);
// impl later
return code;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
return code;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
if (pCur == NULL) return NULL;
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
int32_t c = 0;
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev
// %s", toString);
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
if (rocksdb_iter_valid(pCur->iter)) {
size_t kLen;
SWinKey curKey;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
return pCur;
}
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
streamStateFreeCur(pCur);
return NULL;
return pCur;
}
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {
return -1;
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
SWinKey winKey;
if (rocksdb_iter_valid(pCur->iter)) {
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
winKeyDecode(&winKey, keyStr);
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
if (pVal != NULL) {
char* dst = taosMemoryCalloc(1, vlen);
memcpy(dst, valStr, vlen);
*pVal = dst;
}
if (pVLen != NULL) *pVLen = vlen;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
} else {
return -1;
}
*pKey = winKey;
return 0;
}
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateGetGroupKVByCur_rocksdb");
if (!pCur) {
return -1;
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
uint64_t groupId = pKey->groupId;
int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == 0) {
if (pKey->groupId == groupId) {
return 0;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return -1;
}
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp);
return code;
return pCur;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGetKVByCur_rocksdb");
......@@ -1266,11 +1139,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
SStateSessionKey* pKTmp = &ktmp;
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
if (pVal != NULL) {
*pVal = (char*)val;
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
char* val = NULL;
int32_t len = decodeValueFunc((void*)vval, vLen, NULL, &val);
if (len < 0) {
return -1;
}
if (pVLen != NULL) *pVLen = vLen;
if (pVal != NULL) *pVal = (char*)val;
if (pVLen != NULL) *pVLen = len;
if (pKTmp->opNum != pCur->number) {
return -1;
......@@ -1281,39 +1157,81 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
*pKey = pKTmp->key;
return 0;
}
// fill cf
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0;
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateSeekKeyNext_rocksdb");
STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
return code;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
return code;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
if (rocksdb_iter_valid(pCur->iter)) {
SStateKey curKey;
size_t kLen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
size_t kLen;
SWinKey curKey;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
return pCur;
}
rocksdb_iter_next(pCur->iter);
return pCur;
}
streamStateFreeCur(pCur);
return NULL;
}
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}
SWinKey winKey;
if (rocksdb_iter_valid(pCur->iter)) {
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
winKeyDecode(&winKey, keyStr);
size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
if (pVal != NULL) {
char* dst = taosMemoryCalloc(1, vlen);
memcpy(dst, valStr, vlen);
*pVal = dst;
}
if (pVLen != NULL) *pVLen = vlen;
} else {
return -1;
}
*pKey = winKey;
return 0;
}
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......@@ -1376,20 +1294,6 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
streamStateFreeCur(pCur);
return NULL;
}
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
return 0;
}
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
if (!pCur) {
return -1;
}
rocksdb_iter_next(pCur->iter);
return 0;
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......@@ -1446,36 +1350,6 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
return -1;
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGet_rocksdb");
int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SSessionKey resKey = *key;
void* tmp = NULL;
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (key->win.skey != resKey.win.skey) {
code = -1;
} else {
*key = resKey;
*pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
}
streamStateFreeCur(pCur);
// impl later
return code;
}
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
return code;
}
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) {
qDebug("streamStateSessionAddIfNotExist_rocksdb");
......@@ -1526,6 +1400,27 @@ _end:
streamStateFreeCur(pCur);
return res;
}
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
qDebug("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
break;
}
streamStateCurNext_rocksdb(pState, pCur);
}
streamStateFreeCur(pCur);
return -1;
}
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
qDebug("streamStateStateAddIfNotExist_rocksdb");
......@@ -1583,27 +1478,7 @@ _end:
return res;
}
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
qDebug("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
break;
}
streamStateCurNext_rocksdb(pState, pCur);
}
streamStateFreeCur(pCur);
return -1;
}
// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
......@@ -1615,7 +1490,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void
STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen);
return code;
}
// parname cfg
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
......@@ -1628,7 +1503,136 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
return code;
}
void streamStateDestroy_rocksdb(SStreamState* pState) {
// only close db
streamCleanBackend(pState);
// default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen);
return code;
}
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "default", &key);
return code;
}
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
int32_t vlen = 0;
const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
char* val = NULL;
int32_t len = decodeValueFunc((void*)vval, vlen, NULL, NULL);
if (len < 0) {
rocksdb_iter_next(pIter);
continue;
}
if (end != NULL && strcmp(key, end) > 0) {
break;
}
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
}
} else {
break;
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 0 : -1;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
rocksdb_iter_seek(pCur->iter, key, strlen(key));
}
void streamDefaultIterNext_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
rocksdb_iter_next(pCur->iter);
}
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
int32_t vlen = 0;
char* dst = NULL;
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) {
return NULL;
}
return dst;
}
// batch func
void* streamStateCreateBatch() {
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
return pBatch;
}
int32_t streamStateGetBatchSize(void* pBatch) {
if (pBatch == NULL) return 0;
return rocksdb_writebatch_count(pBatch);
}
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) {
int i = streamGetInit(cfName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName);
return -1;
}
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
return 0;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册