提交 914d3918 编写于 作者: dengyihao's avatar dengyihao

fix tag filter

上级 2d37aeab
......@@ -10,32 +10,48 @@
const char DBPath[] = "rocksdb_c_simple_example";
const char DBBackupPath[] = "/tmp/rocksdb_c_simple_example_backup";
static const int32_t endian_test_var = 1;
#define IS_LITTLE_ENDIAN() (*(uint8_t *)(&endian_test_var) != 0)
#define TD_RT_ENDIAN() (IS_LITTLE_ENDIAN() ? TD_LITTLE_ENDIAN : TD_BIG_ENDIAN)
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
static void *taosDecodeFixedU64(const void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
((uint8_t *)value)[7] = ((uint8_t *)buf)[0];
((uint8_t *)value)[6] = ((uint8_t *)buf)[1];
((uint8_t *)value)[5] = ((uint8_t *)buf)[2];
((uint8_t *)value)[4] = ((uint8_t *)buf)[3];
((uint8_t *)value)[3] = ((uint8_t *)buf)[4];
((uint8_t *)value)[2] = ((uint8_t *)buf)[5];
((uint8_t *)value)[1] = ((uint8_t *)buf)[6];
((uint8_t *)value)[0] = ((uint8_t *)buf)[7];
}
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed U64
static int32_t taosEncodeFixedU64(void **buf, uint64_t value) {
if (buf != NULL) {
((uint8_t *)(*buf))[0] = value & 0xff;
((uint8_t *)(*buf))[1] = (value >> 8) & 0xff;
((uint8_t *)(*buf))[2] = (value >> 16) & 0xff;
((uint8_t *)(*buf))[3] = (value >> 24) & 0xff;
((uint8_t *)(*buf))[4] = (value >> 32) & 0xff;
((uint8_t *)(*buf))[5] = (value >> 40) & 0xff;
((uint8_t *)(*buf))[6] = (value >> 48) & 0xff;
((uint8_t *)(*buf))[7] = (value >> 56) & 0xff;
if (IS_LITTLE_ENDIAN()) {
memcpy(*buf, &value, sizeof(value));
} else {
((uint8_t *)(*buf))[0] = value & 0xff;
((uint8_t *)(*buf))[1] = (value >> 8) & 0xff;
((uint8_t *)(*buf))[2] = (value >> 16) & 0xff;
((uint8_t *)(*buf))[3] = (value >> 24) & 0xff;
((uint8_t *)(*buf))[4] = (value >> 32) & 0xff;
((uint8_t *)(*buf))[5] = (value >> 40) & 0xff;
((uint8_t *)(*buf))[6] = (value >> 48) & 0xff;
((uint8_t *)(*buf))[7] = (value >> 56) & 0xff;
}
*buf = POINTER_SHIFT(*buf, sizeof(value));
}
return (int32_t)sizeof(value);
}
static void *taosDecodeFixedU64(const void *buf, uint64_t *value) {
((uint8_t *)value)[7] = ((uint8_t *)buf)[0];
((uint8_t *)value)[6] = ((uint8_t *)buf)[1];
((uint8_t *)value)[5] = ((uint8_t *)buf)[2];
((uint8_t *)value)[4] = ((uint8_t *)buf)[3];
((uint8_t *)value)[3] = ((uint8_t *)buf)[4];
((uint8_t *)value)[2] = ((uint8_t *)buf)[5];
((uint8_t *)value)[1] = ((uint8_t *)buf)[6];
((uint8_t *)value)[0] = ((uint8_t *)buf)[7];
return POINTER_SHIFT(buf, sizeof(*value));
return (int32_t)sizeof(value);
}
typedef struct KV {
......@@ -45,8 +61,6 @@ typedef struct KV {
int kvSerial(KV *kv, char *buf) {
int len = 0;
buf[0] = 'a';
buf += 1;
len += taosEncodeFixedU64((void **)&buf, kv->k1);
len += taosEncodeFixedU64((void **)&buf, kv->k2);
return len;
......@@ -60,8 +74,8 @@ int kvDBComp(void *state, const char *aBuf, size_t aLen, const char *bBu
char *p1 = (char *)aBuf;
char *p2 = (char *)bBuf;
p1 += 1;
p2 += 1;
// p1 += 1;
// p2 += 1;
p1 = taosDecodeFixedU64(p1, &w1.k1);
p2 = taosDecodeFixedU64(p2, &w2.k1);
......@@ -84,7 +98,7 @@ int kvDBComp(void *state, const char *aBuf, size_t aLen, const char *bBu
}
int kvDeserial(KV *kv, char *buf) {
char *p1 = (char *)buf;
p1 += 1;
// p1 += 1;
p1 = taosDecodeFixedU64(p1, &kv->k1);
p1 = taosDecodeFixedU64(p1, &kv->k2);
......@@ -107,11 +121,15 @@ int main(int argc, char const *argv[]) {
const rocksdb_options_t **cfOpt = malloc(len * sizeof(rocksdb_options_t *));
for (int i = 0; i < len; i++) {
cfOpt[i] = opt;
cfOpt[i] = rocksdb_options_create_copy(opt);
if (i != 0) {
rocksdb_comparator_t *comp = rocksdb_comparator_create(NULL, NULL, kvDBComp, kvDBName);
rocksdb_options_set_comparator((rocksdb_options_t *)cfOpt[i], comp);
}
}
rocksdb_column_family_handle_t **cfHandle = malloc(len * sizeof(rocksdb_column_family_handle_t *));
db = rocksdb_open_column_families(opt, "test", len, cfName, cfOpt, cfHandle, &err);
db = rocksdb_open_column_families(opt, path, len, cfName, cfOpt, cfHandle, &err);
{
rocksdb_readoptions_t *rOpt = rocksdb_readoptions_create();
......@@ -119,10 +137,6 @@ int main(int argc, char const *argv[]) {
char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err);
printf("Get value %s, and len = %d\n", v, (int)vlen);
char *v1 = rocksdb_get_cf(db, rOpt, cfHandle[1], "key", strlen("key"), &vlen, &err);
printf("Get value %s, and len = %d\n", v1, (int)vlen);
rocksdb_readoptions_destroy(rOpt);
}
rocksdb_writeoptions_t *wOpt = rocksdb_writeoptions_create();
......@@ -133,6 +147,40 @@ int main(int argc, char const *argv[]) {
rocksdb_readoptions_t *rOpt = rocksdb_readoptions_create();
size_t vlen = 0;
{
rocksdb_writeoptions_t *wOpt = rocksdb_writeoptions_create();
rocksdb_writebatch_t *wBatch = rocksdb_writebatch_create();
for (int i = 0; i < 100; i++) {
char buf[128] = {0};
KV kv = {.k1 = (100 - i) % 26, .k2 = i % 26};
kvSerial(&kv, buf);
rocksdb_writebatch_put_cf(wBatch, cfHandle[1], buf, sizeof(kv), "value", strlen("value"));
}
rocksdb_write(db, wOpt, wBatch, &err);
}
{
char buf[128] = {0};
KV kv = {.k1 = 0, .k2 = 0};
kvSerial(&kv, buf);
char *v = rocksdb_get_cf(db, rOpt, cfHandle[1], buf, sizeof(kv), &vlen, &err);
printf("Get value %s, and len = %d, xxxx\n", v, (int)vlen);
rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]);
rocksdb_iter_seek_to_first(iter);
int i = 0;
while (rocksdb_iter_valid(iter)) {
size_t klen, vlen;
const char *key = rocksdb_iter_key(iter, &klen);
const char *value = rocksdb_iter_value(iter, &vlen);
KV kv;
kvDeserial(&kv, (char *)key);
printf("kv1: %d\t kv2: %d, len:%d, value = %s\n", (int)(kv.k1), (int)(kv.k2), (int)(klen), value);
i++;
rocksdb_iter_next(iter);
}
rocksdb_iter_destroy(iter);
printf("iterator count %d\n", i);
}
char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err);
printf("Get value %s, and len = %d\n", v, (int)vlen);
......
......@@ -34,6 +34,8 @@ typedef struct STdbState {
rocksdb_t* rocksdb;
rocksdb_column_family_handle_t** pHandle;
rocksdb_writeoptions_t* wopts;
rocksdb_readoptions_t* ropts;
// rocksdb_column_family_handle_t* fillStateDB;
// rocksdb_column_family_handle_t* sessStateDB;
// rocksdb_column_family_handle_t* funcStateDB;
......@@ -65,6 +67,8 @@ int32_t streamStateAbort(SStreamState* pState);
void streamStateDestroy(SStreamState* pState);
typedef struct {
rocksdb_iterator_t* iter;
TBC* pCur;
int64_t number;
} SStreamStateCur;
......@@ -124,6 +128,23 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
/***compare func **/
// todo refactor
typedef struct SStateKey {
SWinKey key;
int64_t opNum;
} SStateKey;
typedef struct SStateSessionKey {
SSessionKey key;
int64_t opNum;
} SStateSessionKey;
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2);
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2);
#if 0
char* streamStateSessionDump(SStreamState* pState);
char* streamStateIntervalDump(SStreamState* pState);
......
......@@ -113,6 +113,12 @@ if(${BUILD_WITH_INVERTEDINDEX})
add_definitions(-DUSE_INVERTED_INDEX)
endif(${BUILD_WITH_INVERTEDINDEX})
if(${BUILD_WITH_ROCKSDB})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -25,18 +25,7 @@
#include "tcompare.h"
#include "ttimer.h"
// todo refactor
typedef struct SStateKey {
SWinKey key;
int64_t opNum;
} SStateKey;
typedef struct SStateSessionKey {
SSessionKey key;
int64_t opNum;
} SStateSessionKey;
static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
......@@ -52,7 +41,7 @@ static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKe
return 0;
}
static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
......@@ -74,7 +63,7 @@ static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey*
return 0;
}
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
......@@ -87,7 +76,7 @@ static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void*
return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
}
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateKey* pWin1 = (SStateKey*)pKey1;
SStateKey* pWin2 = (SStateKey*)pKey2;
......@@ -112,191 +101,6 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
return 0;
}
//
// SStateKey
// |--groupid--|---ts------|--opNum----|
// |--uint64_t-|-uint64_t--|--int64_t--|
//
//
//
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
SStateKey key1, key2;
memset(&key1, 0, sizeof(key1));
memset(&key2, 0, sizeof(key2));
char* p1 = (char*)aBuf;
char* p2 = (char*)bBuf;
p1 = taosDecodeFixedU64(p1, &key1.key.groupId);
p2 = taosDecodeFixedU64(p2, &key2.key.groupId);
p1 = taosDecodeFixedI64(p1, &key1.key.ts);
p2 = taosDecodeFixedI64(p2, &key2.key.ts);
taosDecodeFixedI64(p1, &key1.opNum);
taosDecodeFixedI64(p2, &key2.opNum);
return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
}
int stateKeySerial(SStateKey* key, char* buf) {
int len = 0;
len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
len += taosEncodeFixedI64((void**)&buf, key->key.ts);
len += taosEncodeFixedU64((void**)&buf, key->opNum);
return len;
}
//
// SStateSessionKey
// |-----------SSessionKey----------|
// |-----STimeWindow-----|
// |---skey--|---ekey----|--groupId-|--opNum--|
// |---int64-|--int64_t--|--uint64--|--int64_t|
// |
//
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
SStateSessionKey w1, w2;
memset(&w1, 0, sizeof(w1));
memset(&w2, 0, sizeof(w2));
char* p1 = (char*)aBuf;
char* p2 = (char*)bBuf;
p1 = taosDecodeFixedI64(p1, &w1.key.win.skey);
p2 = taosDecodeFixedI64(p2, &w2.key.win.skey);
p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey);
p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey);
p1 = taosDecodeFixedU64(p1, &w1.key.groupId);
p2 = taosDecodeFixedU64(p2, &w2.key.groupId);
p1 = taosDecodeFixedI64(p1, &w1.opNum);
p2 = taosDecodeFixedI64(p2, &w2.opNum);
return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}
int stateSessionKeySerial(SStateSessionKey* sess, char* buf) {
int len = 0;
len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
len += taosEncodeFixedU64((void**)&buf, sess->key.groupId);
len += taosEncodeFixedI64((void**)&buf, sess->opNum);
return len;
}
/**
* SWinKey
* |------groupId------|-----ts------|
* |------uint64-------|----int64----|
*/
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
SWinKey w1, w2;
memset(&w1, 0, sizeof(w1));
memset(&w2, 0, sizeof(w2));
char* p1 = (char*)aBuf;
char* p2 = (char*)bBuf;
p1 = taosDecodeFixedU64(p1, &w1.groupId);
p2 = taosDecodeFixedU64(p2, &w2.groupId);
p1 = taosDecodeFixedI64(p1, &w1.ts);
p2 = taosDecodeFixedI64(p2, &w2.ts);
return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}
int winKeySerial(SWinKey* key, char* buf) {
int len = 0;
len += taosEncodeFixedU64((void**)&buf, key->groupId);
len += taosEncodeFixedI64((void**)&buf, key->ts);
return len;
}
/*
* STupleKey
* |---groupId---|---ts---|---exprIdx---|
* |---uint64--|---int64--|---int32-----|
*/
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
STupleKey w1, w2;
memset(&w1, 0, sizeof(w1));
memset(&w2, 0, sizeof(w2));
char* p1 = (char*)aBuf;
char* p2 = (char*)bBuf;
p1 = taosDecodeFixedU64(p1, &w1.groupId);
p2 = taosDecodeFixedU64(p2, &w2.groupId);
p1 = taosDecodeFixedI64(p1, &w1.ts);
p2 = taosDecodeFixedI64(p2, &w2.ts);
p1 = taosDecodeFixedI32(p1, &w1.exprIdx);
p2 = taosDecodeFixedI32(p2, &w2.exprIdx);
return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}
int tupleKeySerial(STupleKey* key, char* buf) {
int len = 0;
len += taosEncodeFixedU64((void**)&buf, key->groupId);
len += taosEncodeFixedI64((void**)&buf, key->ts);
len += taosEncodeFixedI32((void**)&buf, key->exprIdx);
return len;
}
const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"};
const char* compareStateName(void* name) { return NULL; }
int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_increase_parallelism(opts, 4);
rocksdb_options_optimize_level_style_compaction(opts, 0);
// create the DB if it's not already present
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
char* err = NULL;
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
for (int i = 0; i < cfLen; i++) {
cfOpt[i] = rocksdb_options_create_copy(opts);
}
rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare);
rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare);
rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare);
rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare);
rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare);
rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
rocksdb_t* db = rocksdb_open_column_families(opts, "rocksdb", cfLen, cfName, cfOpt, cfHandle, &err);
pState->pTdbState->rocksdb = db;
pState->pTdbState->pHandle = cfHandle;
return 0;
}
void streamCleanBackend(SStreamState* pState) {
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
}
rocksdb_close(pState->pTdbState->rocksdb);
}
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) {
......@@ -407,7 +211,7 @@ _err:
void streamStateClose(SStreamState* pState) {
#ifdef USE_ROCKSDB
streamCleanBackend(pState);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
......@@ -744,6 +548,7 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
return;
}
tdbTbcClose(pCur->pCur);
rocksdb_iter_destroy(pCur->iter);
taosMemoryFree(pCur);
}
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册