未验证 提交 84eb77e1 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18590 from taosdata/szhou/table-count-scan

enhancement: table count scan
...@@ -489,8 +489,8 @@ typedef struct STableCountScanSupp { ...@@ -489,8 +489,8 @@ typedef struct STableCountScanSupp {
bool groupByDbName; bool groupByDbName;
bool groupByStbName; bool groupByStbName;
char dbName[TSDB_DB_NAME_LEN]; char dbNameFilter[TSDB_DB_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN]; char stbNameFilter[TSDB_TABLE_NAME_LEN];
} STableCountScanSupp; } STableCountScanSupp;
...@@ -498,16 +498,10 @@ typedef struct STableCountScanOperatorInfo { ...@@ -498,16 +498,10 @@ typedef struct STableCountScanOperatorInfo {
SReadHandle readHandle; SReadHandle readHandle;
SSDataBlock* pRes; SSDataBlock* pRes;
SName tableName;
SNodeList* groupTags;
SNodeList* scanCols;
SNodeList* pseudoCols;
STableCountScanSupp supp; STableCountScanSupp supp;
int32_t currGrpIdx; int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and stable_name SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo; } STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
......
...@@ -486,10 +486,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int ...@@ -486,10 +486,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid); code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
tstrerror(terrno), idStr); pBlock->info.id.uid, tstrerror(terrno), idStr);
} else { } else {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
idStr);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return terrno;
...@@ -1413,8 +1414,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1413,8 +1414,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
NULL); NULL);
if (closedWin && pInfo->partitionSup.needCalc) { if (closedWin && pInfo->partitionSup.needCalc) {
gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
NULL); &gpId, NULL);
} }
} }
} }
...@@ -2432,7 +2433,8 @@ static void destroyTagScanOperatorInfo(void* param) { ...@@ -2432,7 +2433,8 @@ static void destroyTagScanOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -2880,6 +2882,21 @@ _error: ...@@ -2880,6 +2882,21 @@ _error:
// TableCountScanOperator // TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator); static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
static void destoryTableCountScanOperator(void* param); static void destoryTableCountScanOperator(void* param);
static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName);
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
size_t perfdbTableNum);
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
size_t infodbTableNum, size_t perfdbTableNum);
static const char* GROUP_TAG_DB_NAME = "db_name"; static const char* GROUP_TAG_DB_NAME = "db_name";
static const char* GROUP_TAG_STABLE_NAME = "stable_name"; static const char* GROUP_TAG_STABLE_NAME = "stable_name";
...@@ -2941,8 +2958,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun ...@@ -2941,8 +2958,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun
} }
} }
} else { } else {
strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN); strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN); strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3024,7 +3041,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* ...@@ -3024,7 +3041,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
if (pSupp->stbNameSlotId != -1) { if (pSupp->stbNameSlotId != -1) {
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
if (strlen(stbName) != 0) { if (strlen(stbName) != 0) {
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(varStbName), stbName, strlen(stbName)); strncpy(varDataVal(varStbName), stbName, strlen(stbName));
varDataSetLen(varStbName, strlen(stbName)); varDataSetLen(varStbName, strlen(stbName));
colDataAppend(colInfoData, 0, varStbName, false); colDataAppend(colInfoData, 0, varStbName, false);
...@@ -3050,33 +3067,43 @@ static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountSc ...@@ -3050,33 +3067,43 @@ static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountSc
getPerfDbMeta(NULL, &perfdbTableNum); getPerfDbMeta(NULL, &perfdbTableNum);
if (pSupp->groupByDbName) { if (pSupp->groupByDbName) {
if (pInfo->currGrpIdx == 0) { buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
pRes->info.id.groupId = groupId;
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
} else if (pInfo->currGrpIdx == 1) {
uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
pRes->info.id.groupId = groupId;
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
} else {
setOperatorCompleted(pOperator);
return NULL;
}
pInfo->currGrpIdx++;
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} else { } else {
if (strcmp(pSupp->dbName, TSDB_INFORMATION_SCHEMA_DB) == 0) { buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
} else if (strcmp(pSupp->dbName, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
} else if (strlen(pSupp->dbName) == 0) {
fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
}
setOperatorCompleted(pOperator);
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} }
} }
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
size_t infodbTableNum, size_t perfdbTableNum) {
if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
} else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
} else if (strlen(pSupp->dbNameFilter) == 0) {
fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
}
setOperatorCompleted(pOperator);
}
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
size_t perfdbTableNum) {
if (pInfo->currGrpIdx == 0) {
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
pRes->info.id.groupId = groupId;
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
} else if (pInfo->currGrpIdx == 1) {
uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
pRes->info.id.groupId = groupId;
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
} else {
setOperatorCompleted(pOperator);
}
pInfo->currGrpIdx++;
}
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) { static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableCountScanOperatorInfo* pInfo = pOperator->info; STableCountScanOperatorInfo* pInfo = pOperator->info;
...@@ -3091,89 +3118,110 @@ static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) { ...@@ -3091,89 +3118,110 @@ static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
return buildSysDbTableCount(pOperator, pInfo); return buildSysDbTableCount(pOperator, pInfo);
} }
return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
const char* db = NULL; const char* db = NULL;
int32_t vgId = 0; int32_t vgId = 0;
char dbName[TSDB_DB_NAME_LEN] = {0}; char dbName[TSDB_DB_NAME_LEN] = {0};
{ // get dbname
// get dbname vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId); SName sn = {0};
SName sn = {0}; tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); tNameGetDbName(&sn, dbName);
tNameGetDbName(&sn, dbName);
}
if (pSupp->groupByDbName) { if (pSupp->groupByDbName) {
if (pSupp->groupByStbName) { buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
if (pInfo->stbUidList == NULL) { } else {
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t)); buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) { }
qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr()); return pRes->info.rows > 0 ? pRes : NULL;
} }
}
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) { static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx); STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
if (pSupp->groupByStbName) {
char stbName[TSDB_TABLE_NAME_LEN] = {0}; if (pInfo->stbUidList == NULL) {
metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName); pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0}; qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
SMetaStbStats stats = {0};
metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
int64_t ctbNum = stats.ctbNum;
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
pInfo->currGrpIdx++;
} else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
pInfo->currGrpIdx++;
} else {
setOperatorCompleted(pOperator);
return NULL;
} }
}
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);
pInfo->currGrpIdx++;
} else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);
pInfo->currGrpIdx++;
} else { } else {
uint64_t groupId = calcGroupId(dbName, strlen(dbName));
pRes->info.id.groupId = groupId;
int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
} else { } else {
if (strlen(pSupp->dbName) != 0) { uint64_t groupId = calcGroupId(dbName, strlen(dbName));
if (strlen(pSupp->stbName) != 0) { pRes->info.id.groupId = groupId;
tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbName); int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
SMetaStbStats stats = {0}; fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
metaGetStbStats(pInfo->readHandle.meta, uid, &stats); setOperatorCompleted(pOperator);
int64_t ctbNum = stats.ctbNum; }
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbName, ctbNum, pRes); }
} else {
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta); static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
} if (strlen(pSupp->dbNameFilter) != 0) {
if (strlen(pSupp->stbNameFilter) != 0) {
tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
SMetaStbStats stats = {0};
metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
int64_t ctbNum = stats.ctbNum;
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
} else { } else {
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta); int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
} }
setOperatorCompleted(pOperator); } else {
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
} }
return pRes->info.rows > 0 ? pRes : NULL; setOperatorCompleted(pOperator);
}
static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName) {
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
}
static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
char stbName[TSDB_TABLE_NAME_LEN] = {0};
metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
SMetaStbStats stats = {0};
metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
int64_t ctbNum = stats.ctbNum;
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
} }
static void destoryTableCountScanOperator(void* param) { static void destoryTableCountScanOperator(void* param) {
STableCountScanOperatorInfo* pTableCountScanInfo = param; STableCountScanOperatorInfo* pTableCountScanInfo = param;
blockDataDestroy(pTableCountScanInfo->pRes); blockDataDestroy(pTableCountScanInfo->pRes);
nodesDestroyList(pTableCountScanInfo->groupTags);
taosArrayDestroy(pTableCountScanInfo->stbUidList); taosArrayDestroy(pTableCountScanInfo->stbUidList);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册