提交 d5c0ca31 编写于 作者: dengyihao's avatar dengyihao

opt batch write

上级 a3b8ca27
...@@ -122,12 +122,17 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); ...@@ -122,12 +122,17 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len);
// batch func // batch func
int streamStateGetCfIdx(SStreamState* pState, const char* funcName);
void* streamStateCreateBatch(); void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch); int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch); void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl); void* val, int32_t vlen, int64_t ttl);
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl, void* tmpBuf);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif #endif
\ No newline at end of file
...@@ -210,7 +210,6 @@ void streamBackendDelCompare(void* backend, void* arg) { ...@@ -210,7 +210,6 @@ void streamBackendDelCompare(void* backend, void* arg) {
} }
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
int streamGetInit(SStreamState* pState, const char* funcName);
// |key|-----value------| // |key|-----value------|
// |key|ttl|len|userData| // |key|ttl|len|userData|
...@@ -557,14 +556,20 @@ typedef struct { ...@@ -557,14 +556,20 @@ typedef struct {
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
int32_t len = 0;
char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); if (*dest == NULL) {
char* buf = p; char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len);
int32_t len = 0; char* buf = p;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len); len += taosEncodeFixedI32((void**)&buf, key.len);
len += taosEncodeBinary((void**)&buf, (char*)value, vlen); len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
*dest = p; *dest = p;
} else {
char* buf = *dest;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len);
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
}
return len; return len;
} }
/* /*
...@@ -713,7 +718,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t ...@@ -713,7 +718,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
params[i].tableOpt = tableOpt; params[i].tableOpt = tableOpt;
int idx = streamGetInit(NULL, funcname); int idx = streamStateGetCfIdx(NULL, funcname);
SCfInit* cfPara = &ginitDict[idx]; SCfInit* cfPara = &ginitDict[idx];
rocksdb_comparator_t* compare = rocksdb_comparator_t* compare =
...@@ -744,7 +749,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t ...@@ -744,7 +749,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
char idstr[128] = {0}; char idstr[128] = {0};
sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
int idx = streamGetInit(NULL, funcname); int idx = streamStateGetCfIdx(NULL, funcname);
RocksdbCfInst* inst = NULL; RocksdbCfInst* inst = NULL;
RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1); RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
...@@ -955,7 +960,7 @@ void streamStateDestroyCompar(void* arg) { ...@@ -955,7 +960,7 @@ void streamStateDestroyCompar(void* arg) {
taosMemoryFree(comp->comp); taosMemoryFree(comp->comp);
} }
int streamGetInit(SStreamState* pState, const char* funcName) { int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
int idx = -1; int idx = -1;
size_t len = strlen(funcName); size_t len = strlen(funcName);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
...@@ -1002,7 +1007,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len ...@@ -1002,7 +1007,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
} }
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
rocksdb_readoptions_t** readOpt) { rocksdb_readoptions_t** readOpt) {
int idx = streamGetInit(pState, cfName); int idx = streamStateGetCfIdx(pState, cfName);
if (snapshot != NULL) { if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
...@@ -1022,7 +1027,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -1022,7 +1027,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \ code = 0; \
char buf[128] = {0}; \ char buf[128] = {0}; \
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(pState, funcname); \ int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \ qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \ code = -1; \
...@@ -1053,7 +1058,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -1053,7 +1058,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \ code = 0; \
char buf[128] = {0}; \ char buf[128] = {0}; \
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(pState, funcname); \ int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \ qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \ code = -1; \
...@@ -1101,7 +1106,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -1101,7 +1106,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \ code = 0; \
char buf[128] = {0}; \ char buf[128] = {0}; \
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(pState, funcname); \ int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \ code = -1; \
...@@ -2041,7 +2046,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ ...@@ -2041,7 +2046,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) { void* val, int32_t vlen, int64_t ttl) {
int i = streamGetInit(pState, cfName); int i = streamStateGetCfIdx(pState, cfName);
if (i < 0) { if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName); qError("streamState failed to put to cf name:%s", cfName);
...@@ -2057,6 +2062,21 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr ...@@ -2057,6 +2062,21 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
taosMemoryFree(ttlV); taosMemoryFree(ttlV);
return 0; return 0;
} }
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
char buf[128] = {0};
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
if (tmpBuf == NULL) {
taosMemoryFree(ttlV);
}
return 0;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL; char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
......
...@@ -350,6 +350,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -350,6 +350,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
const int32_t BATCH_LIMIT = 256; const int32_t BATCH_LIMIT = 256;
SListNode* pNode = NULL; SListNode* pNode = NULL;
int idx = streamStateGetCfIdx(pFileState->pFileStore, "state");
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
char* buf = taosMemoryCalloc(1, len);
void* batch = streamStateCreateBatch(); void* batch = streamStateCreateBatch();
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
...@@ -360,9 +365,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -360,9 +365,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0); code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, &sKey, pPos->pRowBuff, pFileState->rowSize,
0, buf);
memset(buf, 0, len);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
} }
taosMemoryFree(buf);
if (streamStateGetBatchSize(batch) > 0) { if (streamStateGetBatchSize(batch) > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册