提交 fe9ca49d 编写于 作者: L liuyao

feat:optimize get patitionby name

......@@ -83,7 +83,7 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState);
......
......@@ -77,4 +77,12 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t grou
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateDestroy_rocksdb(SStreamState* pState);
void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
#endif
\ No newline at end of file
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "tlog.h"
......@@ -544,6 +543,44 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
STREAM_STATE_PUT_ROCKSDB(pState, "default", &sKey, value, vLen);
return code;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
}
return 0;
}
void* streamStateCreateBatch() {
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
return pBatch;
}
int32_t streamStateGetBatchSize(void* pBatch) {
if (pBatch == NULL) return -1;
return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch);
}
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) {
int i = streamGetInit(cfName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName);
return -1;
}
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen);
return 0;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
......
......@@ -15,14 +15,13 @@
#include "tstreamFileState.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "thash.h"
#include "tsimplehash.h"
#include "streamBackendRocksdb.h"
#define FLUSH_RATIO 0.2
#define FLUSH_NUM 4
#define FLUSH_RATIO 0.2
#define FLUSH_NUM 4
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
struct SStreamFileState {
......@@ -44,7 +43,8 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) {
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
TSKEY delMark) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
......@@ -71,6 +71,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->getTs = fp;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = -1;
......@@ -122,7 +123,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts) ) {
if (all || (pFileState->getTs(pPos->pKey) < ts)) {
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
......@@ -142,7 +143,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
}
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
uint64_t i = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
......@@ -250,7 +251,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
TSKEY ts = pFileState->getTs(pKey);
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
int32_t len = 0;
void *pVal = NULL;
void* pVal = NULL;
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, pVal, len);
......@@ -287,7 +288,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
}
int32_t len = 0;
void *pBuff = NULL;
void* pBuff = NULL;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
......@@ -304,9 +305,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
return false;
}
void releaseRowBuffPos(SRowBuffPos* pBuff) {
pBuff->beUsed = false;
}
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
......@@ -320,26 +319,39 @@ void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t le
void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
void* buff = *pVal;
taosEncodeFixedI64(&buff, pFileState->flushMark);
}
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SListIter iter = {0};
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
const int32_t BATCH_LIMIT = 128;
SListNode* pNode = NULL;
void* batch = streamStateCreateBatch();
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize);
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch);
}
code =
streamStatePutBatch(pFileState->pFileStore, "default", batch, pPos->pKey, pPos->pRowBuff, pFileState->rowSize);
}
if (streamStateGetBatchSize(batch) > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
streamStateDestroyBatch(batch);
if (flushState) {
int32_t len = 0;
void* buff = NULL;
void* buff = NULL;
streamFileStateEncode(pFileState, &buff, &len);
SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao
SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao
streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len);
taosMemoryFree(buff);
}
......@@ -348,8 +360,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao
void* pStVal = NULL;
SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao
void* pStVal = NULL;
int32_t len = 0;
code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len);
if (code == TSDB_CODE_SUCCESS) {
......@@ -358,7 +370,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
return TSDB_CODE_FAILED;
}
SWinKey key = {.groupId = 0, .ts = 0};
SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
if (!pCur) {
return TSDB_CODE_FAILED;
......@@ -371,10 +383,10 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
if (pFileState->curRowCount == pFileState->maxRowCount) {
break;
}
void* pVal = NULL;
int32_t pVLen = 0;
void* pVal = NULL;
int32_t pVLen = 0;
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void **)&pVal, &pVLen);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册