提交 955f8f9f 编写于 作者: L liuyao

feat:optimize get patitionby name

上级 1f63859e
......@@ -17,6 +17,7 @@
#include "rocksdb/c.h"
#include "tdbInt.h"
#include "tsimplehash.h"
#include "tstreamFileState.h"
#ifdef __cplusplus
......@@ -54,9 +55,10 @@ typedef struct STdbState {
// incremental state storage
typedef struct {
STdbState* pTdbState;
STdbState* pTdbState;
SStreamFileState* pFileState;
int32_t number;
int32_t number;
SSHashObj* parNameMap;
} SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
......
......@@ -714,9 +714,6 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
......
......@@ -1218,33 +1218,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
return 0;
}
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pbInfo->pRes;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
streamFreeVal(tbname);
}
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......
......@@ -1130,6 +1130,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
}
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
qInfo("do stream range scan. windows index:%d", *pRowIndex);
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pTableScanOp);
......
......@@ -2627,11 +2627,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
}
if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
}
while (1) {
......@@ -4864,11 +4864,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
}
if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
}
while (1) {
......
......@@ -15,7 +15,7 @@
#include "streamInc.h"
#define STREAM_EXEC_MAX_BATCH_NUM 100
#define STREAM_EXEC_MAX_BATCH_NUM 1024
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code;
......
......@@ -26,6 +26,8 @@
#include "tcompare.h"
#include "ttimer.h"
#define MAX_TABLE_NAME_NUM 100000
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) {
return 1;
......@@ -133,6 +135,8 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
qWarn("open stream state2, %s", statePath);
pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
pState->parNameMap = tSimpleHashInit(1024, hashFn);
return pState;
#else
......@@ -1020,7 +1024,14 @@ _end:
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
qWarn("try to write to cf parname");
#ifdef USE_ROCKSDB
return streamStatePutParName_rocksdb(pState, groupId, tbname);
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
streamStatePutParName_rocksdb(pState, groupId, tbname);
}
return TSDB_CODE_SUCCESS;
}
tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
return TSDB_CODE_SUCCESS;
#else
return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
pState->pTdbState->txn);
......@@ -1029,7 +1040,16 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
#ifdef USE_ROCKSDB
return streamStateGetParName_rocksdb(pState, groupId, pVal);
void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
if (!pStr) {
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
return streamStateGetParName_rocksdb(pState, groupId, pVal);
}
return TSDB_CODE_FAILED;
}
*pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
return TSDB_CODE_SUCCESS;
#else
int32_t len;
return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
......
......@@ -56,10 +56,12 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState) {
goto _error;
}
pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFileState->rowBuffMap = tSimpleHashInit(1024, hashFn);
int32_t cap = TMIN(10240, pFileState->maxRowCount);
pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn);
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
goto _error;
}
......@@ -69,7 +71,6 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->getTs = fp;
pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = -1;
......@@ -125,7 +126,9 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
if (!all) {
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
}
destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
......@@ -155,6 +158,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
i++;
}
}
qInfo("do stream state flush %d rows to disck. is used: %d", listNEles(pFlushList), used);
}
int32_t flushRowBuff(SStreamFileState* pFileState) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册