未验证 提交 c8f9707d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18203 from taosdata/feature/stream

enh(stream): add tbname map cache
...@@ -154,8 +154,8 @@ typedef struct { ...@@ -154,8 +154,8 @@ typedef struct {
} SSchemaInfo; } SSchemaInfo;
typedef struct { typedef struct {
int32_t operatorType; int32_t operatorType;
int64_t refId; int64_t refId;
} SExchangeOpStopInfo; } SExchangeOpStopInfo;
typedef struct { typedef struct {
...@@ -261,10 +261,10 @@ typedef struct SLimitInfo { ...@@ -261,10 +261,10 @@ typedef struct SLimitInfo {
} SLimitInfo; } SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pResultBlockList; SArray* pResultBlockList;
...@@ -275,7 +275,7 @@ typedef struct SExchangeInfo { ...@@ -275,7 +275,7 @@ typedef struct SExchangeInfo {
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp int64_t openedTs; // start exec time stamp
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
...@@ -310,9 +310,9 @@ typedef struct { ...@@ -310,9 +310,9 @@ typedef struct {
} SAggOptrPushDownInfo; } SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo { typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch; uint64_t metaFetch;
uint64_t cacheHit; uint64_t cacheHit;
} STableMetaCacheInfo; } STableMetaCacheInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
...@@ -343,7 +343,7 @@ typedef struct STableMergeScanInfo { ...@@ -343,7 +343,7 @@ typedef struct STableMergeScanInfo {
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond SArray* queryConds; // array of queryTableDataCond
STsdbReader* pReader; STsdbReader* pReader;
SReadHandle readHandle; SReadHandle readHandle;
int32_t bufPageSize; int32_t bufPageSize;
...@@ -358,7 +358,7 @@ typedef struct STableMergeScanInfo { ...@@ -358,7 +358,7 @@ typedef struct STableMergeScanInfo {
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset; int32_t* rowEntryInfoOffset;
SExprInfo* pExpr; SExprInfo* pExpr;
...@@ -504,6 +504,7 @@ typedef struct SStreamScanInfo { ...@@ -504,6 +504,7 @@ typedef struct SStreamScanInfo {
STimeWindow updateWin; STimeWindow updateWin;
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes; SSDataBlock* pUpdateDataRes;
SHashObj* pGroupIdTbNameMap;
// status for tmq // status for tmq
SNodeList* pGroupTags; SNodeList* pGroupTags;
SNode* pTagCond; SNode* pTagCond;
...@@ -550,10 +551,10 @@ typedef struct SSysTableScanInfo { ...@@ -550,10 +551,10 @@ typedef struct SSysTableScanInfo {
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
STsdbReader* pHandle; STsdbReader* pHandle;
SReadHandle readHandle; SReadHandle readHandle;
uint64_t uid; // table uid uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
// todo remove this // todo remove this
...@@ -663,9 +664,9 @@ typedef struct SGroupbyOperatorInfo { ...@@ -663,9 +664,9 @@ typedef struct SGroupbyOperatorInfo {
SAggSupporter aggSup; SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn> SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SExprSupp scalarSup; SExprSupp scalarSup;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
...@@ -890,14 +891,14 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -890,14 +891,14 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doDestroyExchangeOperatorInfo(void* param); void doDestroyExchangeOperatorInfo(void* param);
void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo, void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
SExecTaskInfo* pTaskInfo); void* pInfo, SExecTaskInfo* pTaskInfo);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache); int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
...@@ -992,7 +993,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo); ...@@ -992,7 +993,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator); void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
/* /*
...@@ -1038,6 +1039,7 @@ void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKE ...@@ -1038,6 +1039,7 @@ void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKE
uint64_t* pGp, void* pTbName); uint64_t* pGp, void* pTbName);
void printDataBlock(SSDataBlock* pBlock, const char* flag); void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
...@@ -1061,7 +1063,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul ...@@ -1061,7 +1063,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo); int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1515,10 +1515,17 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1515,10 +1515,17 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t srcUid = srcUidData[i]; uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i]; uint64_t groupId = srcGp[i];
char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
if (groupId == 0) { if (groupId == 0) {
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version); groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
} }
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, NULL); if (pInfo->tbnameCalSup.pExprInfo) {
char* parTbname = taosHashGet(pInfo->pGroupIdTbNameMap, &groupId, sizeof(int64_t));
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
varDataSetLen(tbname, strlen(varDataVal(tbname)));
}
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
tbname[0] == 0 ? NULL : tbname);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1562,9 +1569,16 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* ...@@ -1562,9 +1569,16 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
blockDataDestroy(pSrcBlock); blockDataDestroy(pSrcBlock);
} }
static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return; if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return; if (pBlock == NULL || pBlock->info.rows == 0) return;
if (pBlock->info.groupId) {
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
}
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0); SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
ASSERT(pSrcBlock->info.rows == 1); ASSERT(pSrcBlock->info.rows == 1);
...@@ -1592,6 +1606,11 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { ...@@ -1592,6 +1606,11 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
} }
if (pBlock->info.groupId) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
}
blockDataDestroy(pSrcBlock); blockDataDestroy(pSrcBlock);
blockDataDestroy(pResBlock); blockDataDestroy(pResBlock);
} }
...@@ -1713,7 +1732,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1713,7 +1732,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes); calBlockTbName(pInfo, pInfo->pRes);
return 0; return 0;
} }
...@@ -1960,7 +1979,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1960,7 +1979,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) { if (pBlock != NULL) {
calBlockTbName(&pInfo->tbnameCalSup, pBlock); calBlockTbName(pInfo, pBlock);
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
qDebug("stream recover scan get block, rows %d", pBlock->info.rows); qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
return pBlock; return pBlock;
...@@ -2081,7 +2100,7 @@ FETCH_NEXT_BLOCK: ...@@ -2081,7 +2100,7 @@ FETCH_NEXT_BLOCK:
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update"); // printDataBlock(pSDB, "stream scan update");
calBlockTbName(&pInfo->tbnameCalSup, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }
blockDataCleanup(pInfo->pUpdateDataRes); blockDataCleanup(pInfo->pUpdateDataRes);
...@@ -2386,6 +2405,7 @@ static void destroyStreamScanOperatorInfo(void* param) { ...@@ -2386,6 +2405,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
} }
cleanupExprSupp(&pStreamScan->tbnameCalSup); cleanupExprSupp(&pStreamScan->tbnameCalSup);
taosHashCleanup(pStreamScan->pGroupIdTbNameMap);
updateInfoDestroy(pStreamScan->pUpdateInfo); updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pRes);
...@@ -2443,6 +2463,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2443,6 +2463,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) { if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
goto _error; goto _error;
} }
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
} }
if (pTableScanNode->pTags != NULL) { if (pTableScanNode->pTags != NULL) {
......
...@@ -328,7 +328,7 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -328,7 +328,7 @@ int32_t walEndSnapshot(SWal *pWal) {
pInfo--; pInfo--;
} }
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) { if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
wDebug("vgId:%d, wal end remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); wDebug("vgId:%d, wal end remove for %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
} else { } else {
wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册