提交 be5b72b2 编写于 作者: H Haojun Liao

enh(query): add cache for table meta entry in table scan.

上级 c1794e02
...@@ -153,7 +153,7 @@ bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) { ...@@ -153,7 +153,7 @@ bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
SMeta *pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
int64_t version; int64_t version1;
// query uid.idx // query uid.idx
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) { if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
...@@ -161,8 +161,8 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { ...@@ -161,8 +161,8 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
return -1; return -1;
} }
version = ((SUidIdxVal *)pReader->pBuf)[0].version; version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
return metaGetTableEntryByVersion(pReader, version, uid); return metaGetTableEntryByVersion(pReader, version1, uid);
} }
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
......
...@@ -318,6 +318,7 @@ typedef struct STableScanInfo { ...@@ -318,6 +318,7 @@ typedef struct STableScanInfo {
int8_t scanMode; int8_t scanMode;
SAggOptrPushDownInfo pdInfo; SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid; int8_t assignBlockUid;
SLRUCache* pTableMetaEntryCache; // 100 by default
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
...@@ -896,8 +897,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul ...@@ -896,8 +897,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr); SSDataBlock* pBlock, int32_t rows, const char* idStr, SLRUCache* pCache);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
......
...@@ -495,51 +495,68 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction ...@@ -495,51 +495,68 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
SET_REVERSE_SCAN_FLAG(pTableScanInfo); SET_REVERSE_SCAN_FLAG(pTableScanInfo);
switchCtxOrder(pCtx, numOfOutput); switchCtxOrder(pCtx, numOfOutput);
// setupQueryRangeForReverseScan(pTableScanInfo);
pTableScanInfo->cond.order = TSDB_ORDER_DESC; pTableScanInfo->cond.order = TSDB_ORDER_DESC;
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows; STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
TSWAP(pTWindow->skey, pTWindow->ekey); TSWAP(pTWindow->skey, pTWindow->ekey);
} }
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, typedef struct STableCachedVal {
SSDataBlock* pBlock, int32_t rows, const char* idStr) { const char* pName;
STag* pTags;
} STableCachedVal;
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr, SLRUCache* pCache) {
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (numOfPseudoExpr <= 0) { if (numOfExpr <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = 0;
// backup the rows // backup the rows
int32_t backupRows = pBlock->info.rows; int32_t backupRows = pBlock->info.rows;
pBlock->info.rows = rows; pBlock->info.rows = rows;
SMetaReader mr = {0}; STableCachedVal val = {0};
metaReaderInit(&mr, pHandle->meta, 0); bool needFreeReader = true;
int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
metaReaderReleaseLock(&mr);
if (code != TSDB_CODE_SUCCESS) { // 1. check if it is existed in meta cache
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); LRUHandle *h = taosLRUCacheLookup(pCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
metaReaderClear(&mr); if (h == NULL) {
return terrno; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
metaReaderClear(&mr);
return terrno;
}
metaReaderReleaseLock(&mr);
val.pName = mr.me.name;
val.pTags = (STag*)mr.me.ctbEntry.pTags;
} else {
STableCachedVal* pVal = taosLRUCacheValue(pCache, h);
val = *pVal;
} }
for (int32_t j = 0; j < numOfPseudoExpr; ++j) { for (int32_t j = 0; j < numOfExpr; ++j) {
SExprInfo* pExpr = &pPseudoExpr[j]; const SExprInfo* pExpr1 = &pExpr[j];
int32_t dstSlotId = pExpr->base.resSchema.slotId; int32_t dstSlotId = pExpr1->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataCleanup(pColInfoData, pBlock->info.rows); colInfoDataCleanup(pColInfoData, pBlock->info.rows);
int32_t functionId = pExpr->pExpr->_function.functionId; int32_t functionId = pExpr1->pExpr->_function.functionId;
// this is to handle the tbname // this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name); setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
} else { // these are tags } else { // these are tags
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = pExpr->base.pParam[0].pCol->colId; tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal); const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
char* data = NULL; char* data = NULL;
if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
...@@ -881,6 +898,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -881,6 +898,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->pTableMetaEntryCache = taosLRUCacheInit(100, -1, .5);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
getTableScannerExecInfo); getTableScannerExecInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册