提交 d7e86aca 编写于 作者: dengyihao's avatar dengyihao

fix case failure

上级 f7a7743a
...@@ -45,7 +45,7 @@ typedef struct STdbState { ...@@ -45,7 +45,7 @@ typedef struct STdbState {
void* env; void* env;
SListNode* pComparNode; SListNode* pComparNode;
void* pBackendHandle; void* pBackendHandle;
char idstr[48]; char idstr[64];
void* compactFactory; void* compactFactory;
TDB* db; TDB* db;
......
...@@ -42,6 +42,8 @@ typedef struct { ...@@ -42,6 +42,8 @@ typedef struct {
TdThreadMutex mutex; TdThreadMutex mutex;
rocksdb_compactionfilterfactory_t* filterFactory; rocksdb_compactionfilterfactory_t* filterFactory;
SList* list; SList* list;
TdThreadMutex cfMutex;
SHashObj* cfInst;
} SBackendHandle; } SBackendHandle;
void* streamBackendInit(const char* path); void* streamBackendInit(const char* path);
......
...@@ -21,6 +21,20 @@ typedef struct SCompactFilteFactory { ...@@ -21,6 +21,20 @@ typedef struct SCompactFilteFactory {
void* status; void* status;
} SCompactFilteFactory; } SCompactFilteFactory;
typedef struct {
rocksdb_t* db;
rocksdb_column_family_handle_t** pHandle;
rocksdb_writeoptions_t* wOpt;
rocksdb_readoptions_t* rOpt;
rocksdb_options_t** cfOpt;
rocksdb_options_t* dbOpt;
void* param;
void* pBackendHandle;
SListNode* pCompareNode;
} RocksdbCfInst;
RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr);
void destroyCompactFilteFactory(void* arg); void destroyCompactFilteFactory(void* arg);
void destroyCompactFilte(void* arg); void destroyCompactFilte(void* arg);
const char* compactFilteFactoryName(void* arg); const char* compactFilteFactoryName(void* arg);
...@@ -52,9 +66,12 @@ const char* compareParKeyName(void* name); ...@@ -52,9 +66,12 @@ const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name); const char* comparePartagKeyName(void* name);
void* streamBackendInit(const char* path) { void* streamBackendInit(const char* path) {
qDebug("init stream backend");
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator)); pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->mutex, NULL);
taosThreadMutexInit(&pHandle->cfMutex, NULL);
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
rocksdb_env_set_low_priority_background_threads(env, 4); rocksdb_env_set_low_priority_background_threads(env, 4);
...@@ -79,12 +96,44 @@ void* streamBackendInit(const char* path) { ...@@ -79,12 +96,44 @@ void* streamBackendInit(const char* path) {
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory); rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
char* err = NULL; char* err = NULL;
pHandle->db = rocksdb_open(opts, path, &err); size_t nCf = 0;
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err); char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err);
if (nCf == 0 || err != NULL) {
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
// goto _EXIT; pHandle->db = rocksdb_open(opts, path, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
}
} else {
int64_t streamId;
int32_t taskId, dummpy = 0;
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (size_t i = 0; i < nCf; i++) {
char* cf = cfs[i];
char suffix[64] = {0};
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) {
char idstr[128] = {0};
sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
if (taosHashGet(tbl, idstr, strlen(idstr)) != NULL) {
taosHashPut(tbl, idstr, strlen(idstr), &dummpy, sizeof(dummpy));
}
} else {
continue;
}
}
void* pIter = taosHashIterate(tbl, NULL);
while (pIter != NULL) {
size_t keyLen = 0;
char* key = taosHashGetKey(pIter, &keyLen);
RocksdbCfInst* inst = streamStateOpenBackendCf(pHandle, (char*)path, key);
taosHashPut(pHandle->cfInst, key, keyLen, &inst, sizeof(void*));
taosHashIterate(tbl, pIter);
}
taosHashCleanup(tbl);
} }
return (void*)pHandle; return (void*)pHandle;
...@@ -93,6 +142,8 @@ _EXIT: ...@@ -93,6 +142,8 @@ _EXIT:
rocksdb_cache_destroy(cache); rocksdb_cache_destroy(cache);
rocksdb_env_destroy(env); rocksdb_env_destroy(env);
taosThreadMutexDestroy(&pHandle->mutex); taosThreadMutexDestroy(&pHandle->mutex);
taosThreadMutexDestroy(&pHandle->cfMutex);
taosHashCleanup(pHandle->cfInst);
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list); tdListFree(pHandle->list);
free(pHandle); free(pHandle);
...@@ -124,6 +175,8 @@ void streamBackendCleanup(void* arg) { ...@@ -124,6 +175,8 @@ void streamBackendCleanup(void* arg) {
} }
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list); tdListFree(pHandle->list);
taosThreadMutexDestroy(&pHandle->cfMutex);
taosHashCleanup(pHandle->cfInst);
taosMemoryFree(pHandle); taosMemoryFree(pHandle);
...@@ -493,7 +546,7 @@ typedef struct { ...@@ -493,7 +546,7 @@ typedef struct {
} SCfInit; } SCfInit;
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX)); #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
...@@ -616,11 +669,92 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c ...@@ -616,11 +669,92 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c
return filter; return filter;
} }
RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr) {
// qInfo("start to open backend cf, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
SBackendHandle* handle = backend;
char* err = NULL;
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
char** cfNames = taosMemoryCalloc(cfLen, sizeof(char*));
for (int i = 0; i < cfLen; i++) {
cfNames[i] = taosMemoryCalloc(1, 128);
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[i].key);
}
RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
for (int i = 0; i < cfLen; i++) {
cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt);
// refactor later
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
param[i].tableOpt = tableOpt;
};
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
for (int i = 0; i < cfLen; i++) {
SCfInit* cf = &ginitDict[i];
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
pCompare[i] = compare;
}
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, cfLen, (const char* const*)cfNames,
(const rocksdb_options_t* const*)cfOpt, cfHandle, &err);
if (err != NULL) {
qError("failed to open rocksdb cf, reason:%s", err);
taosMemoryFree(err);
} else {
qDebug("succ to open rocksdb cf, reason:%s", err);
}
RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
inst->db = db;
inst->pHandle = cfHandle;
inst->wOpt = rocksdb_writeoptions_create();
inst->rOpt = rocksdb_readoptions_create();
inst->cfOpt = (rocksdb_options_t**)cfOpt;
inst->dbOpt = handle->dbOpt;
inst->param = param;
inst->pBackendHandle = handle;
handle->db = db;
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
inst->pCompareNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
return inst;
}
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
SBackendHandle* handle = backend; SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
taosThreadMutexLock(&handle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr));
if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst;
pState->pTdbState->rocksdb = inst->db;
pState->pTdbState->pHandle = inst->pHandle;
pState->pTdbState->writeOpts = inst->wOpt;
pState->pTdbState->readOpts = inst->rOpt;
pState->pTdbState->cfOpts = inst->cfOpt;
pState->pTdbState->dbOpt = handle->dbOpt;
pState->pTdbState->param = inst->param;
pState->pTdbState->pBackendHandle = handle;
pState->pTdbState->pComparNode = inst->pCompareNode;
taosThreadMutexUnlock(&handle->cfMutex);
return 0;
}
taosThreadMutexUnlock(&handle->cfMutex);
char* err = NULL; char* err = NULL;
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
...@@ -650,7 +784,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { ...@@ -650,7 +784,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
} }
rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
char buf[64] = {0}; char buf[128] = {0};
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key); GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key);
cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err); cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err);
if (err != NULL) { if (err != NULL) {
...@@ -670,12 +804,21 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { ...@@ -670,12 +804,21 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
return 0; return 0;
} }
void streamStateCloseBackend(SStreamState* pState, bool remove) { void streamStateCloseBackend(SStreamState* pState, bool remove) {
SBackendHandle* pHandle = pState->pTdbState->pBackendHandle;
taosThreadMutexLock(&pHandle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr));
if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst;
taosMemoryFree(inst);
taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr));
}
taosThreadMutexUnlock(&pHandle->cfMutex);
char* status[] = {"close", "drop"}; char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId,
pState->taskId); pState->taskId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册