提交 eab6f84b 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'enh/rocksRevert' of https://github.com/taosdata/TDengine into enh/rocksRevert

...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "tdbInt.h" #include "tdbInt.h"
#include "tsimplehash.h"
#include "tstreamFileState.h" #include "tstreamFileState.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -57,6 +58,7 @@ typedef struct { ...@@ -57,6 +58,7 @@ typedef struct {
STdbState* pTdbState; STdbState* pTdbState;
SStreamFileState* pFileState; SStreamFileState* pFileState;
int32_t number; int32_t number;
SSHashObj* parNameMap;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
......
...@@ -714,9 +714,6 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo ...@@ -714,9 +714,6 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf); SDiskbasedBuf* pBuf);
......
...@@ -1218,33 +1218,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS ...@@ -1218,33 +1218,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
return 0; return 0;
} }
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pbInfo->pRes;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
streamFreeVal(tbname);
}
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) { SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......
...@@ -1130,6 +1130,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t ...@@ -1130,6 +1130,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
} }
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
qInfo("do stream range scan. windows index:%d", *pRowIndex);
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pTableScanOp); pResult = doTableScan(pInfo->pTableScanOp);
......
...@@ -2627,11 +2627,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2627,11 +2627,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
if (!pInfo->pUpdated) { if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
} }
if (!pInfo->pUpdatedMap) { if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
} }
while (1) { while (1) {
...@@ -4864,11 +4864,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4864,11 +4864,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (!pInfo->pUpdated) { if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
} }
if (!pInfo->pUpdatedMap) { if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
} }
while (1) { while (1) {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "streamInc.h" #include "streamInc.h"
#define STREAM_EXEC_MAX_BATCH_NUM 100 #define STREAM_EXEC_MAX_BATCH_NUM 1024
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code; int32_t code;
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
#include "tcompare.h" #include "tcompare.h"
#include "ttimer.h" #include "ttimer.h"
#define MAX_TABLE_NAME_NUM 100000
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) { if (pWin1->groupId > pWin2->groupId) {
return 1; return 1;
...@@ -133,6 +135,8 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -133,6 +135,8 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
qWarn("open stream state2, %s", statePath); qWarn("open stream state2, %s", statePath);
pState->pTdbState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL; pState->pFileState = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
pState->parNameMap = tSimpleHashInit(1024, hashFn);
return pState; return pState;
#else #else
...@@ -1020,7 +1024,14 @@ _end: ...@@ -1020,7 +1024,14 @@ _end:
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
qWarn("try to write to cf parname"); qWarn("try to write to cf parname");
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return streamStatePutParName_rocksdb(pState, groupId, tbname); if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
streamStatePutParName_rocksdb(pState, groupId, tbname);
}
return TSDB_CODE_SUCCESS;
}
tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
return TSDB_CODE_SUCCESS;
#else #else
return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
pState->pTdbState->txn); pState->pTdbState->txn);
...@@ -1029,7 +1040,16 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char ...@@ -1029,7 +1040,16 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return streamStateGetParName_rocksdb(pState, groupId, pVal); void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
if (!pStr) {
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
return streamStateGetParName_rocksdb(pState, groupId, pVal);
}
return TSDB_CODE_FAILED;
}
*pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
return TSDB_CODE_SUCCESS;
#else #else
int32_t len; int32_t len;
return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
......
...@@ -57,10 +57,12 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ ...@@ -57,10 +57,12 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState) { if (!pFileState) {
goto _error; goto _error;
} }
pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFileState->rowBuffMap = tSimpleHashInit(1024, hashFn); int32_t cap = TMIN(10240, pFileState->maxRowCount);
pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn);
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
goto _error; goto _error;
} }
...@@ -126,7 +128,9 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { ...@@ -126,7 +128,9 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
ASSERT(pPos->pRowBuff != NULL); ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL; pPos->pRowBuff = NULL;
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); if (!all) {
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
}
destroyRowBuffPos(pPos); destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode); tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode); taosMemoryFreeClear(pNode);
...@@ -156,6 +160,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin ...@@ -156,6 +160,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
i++; i++;
} }
} }
qInfo("do stream state flush %d rows to disck. is used: %d", listNEles(pFlushList), used);
} }
int32_t flushRowBuff(SStreamFileState* pFileState) { int32_t flushRowBuff(SStreamFileState* pFileState) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册