提交 659e429b 编写于 作者: H Haojun Liao

feature(query): support show table block distribution.

上级 0e77d4dc
...@@ -116,7 +116,7 @@ typedef void *tsdbReaderT; ...@@ -116,7 +116,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef); void *pMemRef);
......
...@@ -119,7 +119,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu ...@@ -119,7 +119,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
......
...@@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle ...@@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) { uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
...@@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi ...@@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi
} }
if (emptyQueryTimewindow(pTsdbReadHandle)) { if (emptyQueryTimewindow(pTsdbReadHandle)) {
return (tsdbReaderT*)pTsdbReadHandle; return (tsdbReaderT)pTsdbReadHandle;
} }
// todo apply the lastkey of table check to avoid to load header file // todo apply the lastkey of table check to avoid to load header file
......
...@@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo { ...@@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo {
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
void* pHandle; void* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
// todo remove this // todo remove this
...@@ -735,7 +737,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -735,7 +737,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
......
...@@ -502,6 +502,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -502,6 +502,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId; int32_t slotId = pFuncParam->pCol->slotId;
if (slotId >= taosArrayGetSize(pBlock->pDataBlock)) {
slotId = taosArrayGetSize(pBlock->pDataBlock) - 1;
}
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
pInput->totalRows = pBlock->info.rows; pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows;
...@@ -2812,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -2812,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
// todo add more information about exchange operation // todo add more information about exchange operation
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) { type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) {
*order = TSDB_ORDER_ASC; *order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN; *scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2840,7 +2844,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -2840,7 +2844,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -4087,6 +4090,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4087,6 +4090,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
}
} else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
taosArrayPush(pTableListInfo->pTableList, &info);
}
SQueryTableDataCond cond = {0};
{
cond.order = TSDB_ORDER_ASC;
cond.numOfCols = 1;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
cond.colList->colId = 1;
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
cond.colList->bytes = sizeof(TSKEY);
cond.numOfTWindows = 1;
cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow));
cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
cond.suid = pBlockNode->suid;
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
}
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
cleanupQueryTableDataCond(&cond);
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -4284,8 +4327,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -4284,8 +4327,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code = int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <vnode.h>
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
...@@ -590,12 +591,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -590,12 +591,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo->dataReader = pReadHandle; pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
return pOperator; return pOperator;
...@@ -606,16 +607,48 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -606,16 +607,48 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
STableScanInfo* pTableScanInfo = pOperator->info; SBlockDistInfo* pBlockScanInfo = pOperator->info;
STableBlockDistInfo blockDistInfo = {0}; STableBlockDistInfo blockDistInfo = {0};
blockDistInfo.maxRows = INT_MIN; blockDistInfo.maxRows = INT_MIN;
blockDistInfo.minRows = INT_MAX; blockDistInfo.minRows = INT_MAX;
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo); SMetaReader mr = {0};
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); metaReaderInit(&mr, pBlockScanInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&mr, pBlockScanInfo->uid);
if (mr.me.type == TSDB_SUPER_TABLE) {
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
int32_t rowLen = 0;
for(int32_t i = 0; i < numOfCols; ++i) {
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
}
blockDistInfo.rowSize = rowLen;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
uint64_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid);
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
int32_t rowLen = 0;
for(int32_t i = 0; i < numOfCols; ++i) {
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
}
blockDistInfo.rowSize = rowLen;
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
int32_t rowLen = 0;
for(int32_t i = 0; i < numOfCols; ++i) {
rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
}
blockDistInfo.rowSize = rowLen;
}
metaReaderClear(&mr);
SSDataBlock* pBlock = pTableScanInfo->pResBlock; tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
pBlock->info.rows = 1; pBlock->info.rows = 1;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
...@@ -625,6 +658,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -625,6 +658,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
varDataSetLen(p, len); varDataSetLen(p, len);
colInfoDataEnsureCapacity(pColInfo, 0, 1);
colDataAppend(pColInfo, 0, p, false); colDataAppend(pColInfo, 0, p, false);
taosMemoryFree(p); taosMemoryFree(p);
...@@ -637,7 +671,7 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -637,7 +671,7 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pDistInfo->pResBlock); blockDataDestroy(pDistInfo->pResBlock);
} }
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -645,7 +679,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* ...@@ -645,7 +679,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
goto _error; goto _error;
} }
pInfo->pHandle = dataReader; pInfo->pHandle = dataReader;
pInfo->readHandle = *readHandle;
pInfo->uid = uid;
pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
...@@ -653,13 +689,14 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* ...@@ -653,13 +689,14 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = 1024; infoData.info.bytes = 1024;
pInfo->pResBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData);
pOperator->name = "DataBlockInfoScanOperator"; pOperator->name = "DataBlockDistScanOperator";
// pOperator->operatorType = OP_TableBlockInfoScan; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
......
...@@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx); ...@@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx);
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t blockDistFunction(SqlFunctionCtx *pCtx); int32_t blockDistFunction(SqlFunctionCtx *pCtx);
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
......
...@@ -2523,6 +2523,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2523,6 +2523,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateBlockDistFunc, .translateFunc = translateBlockDistFunc,
.getEnvFunc = getBlockDistFuncEnv, .getEnvFunc = getBlockDistFuncEnv,
.initFunc = blockDistSetup,
.processFunc = blockDistFunction, .processFunc = blockDistFunction,
.finalizeFunc = blockDistFinalize .finalizeFunc = blockDistFinalize
}, },
......
...@@ -5008,7 +5008,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5008,7 +5008,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock); return functionFinalize(pCtx, pBlock);
} }
bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->minRows = INT32_MAX;
return true;
}
int32_t blockDistFunction(SqlFunctionCtx* pCtx) { int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
const int32_t BLOCK_DIST_RESULT_ROWS = 24;
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
...@@ -5026,6 +5038,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { ...@@ -5026,6 +5038,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
pDistInfo->totalRows += p1.totalRows; pDistInfo->totalRows += p1.totalRows;
pDistInfo->numOfFiles += p1.numOfFiles; pDistInfo->numOfFiles += p1.numOfFiles;
pDistInfo->defMinRows = p1.defMinRows;
pDistInfo->defMaxRows = p1.defMaxRows;
pDistInfo->rowSize = p1.rowSize;
pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks;
if (pDistInfo->minRows > p1.minRows) { if (pDistInfo->minRows > p1.minRows) {
pDistInfo->minRows = p1.minRows; pDistInfo->minRows = p1.minRows;
} }
...@@ -5037,7 +5054,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { ...@@ -5037,7 +5054,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
} }
pResInfo->numOfRes = 1; pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5049,7 +5066,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist ...@@ -5049,7 +5066,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
...@@ -5080,7 +5097,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo ...@@ -5080,7 +5097,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
...@@ -5102,32 +5119,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo ...@@ -5102,32 +5119,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* pData = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
int32_t row = 0; int32_t row = 0;
STableBlockDistInfo info = {0};
tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info);
char st[256] = {0}; char st[256] = {0};
double totalRawSize = pData->totalRows * pData->rowSize;
int32_t len = int32_t len =
sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]", sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0), pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks,
info.totalSize / (info.totalRows * info.rowSize * 1.0)); pData->totalSize * 100 / totalRawSize, '%');
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]", len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]",
info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows); pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks);
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables, len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
info.numOfFiles, 0); pData->numOfFiles, 0);
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
...@@ -5139,40 +5153,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5139,40 +5153,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t maxVal = 0; int32_t maxVal = 0;
int32_t minVal = INT32_MAX; int32_t minVal = INT32_MAX;
for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) { for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
if (maxVal < info.blockRowsHisto[i]) { if (maxVal < pData->blockRowsHisto[i]) {
maxVal = info.blockRowsHisto[i]; maxVal = pData->blockRowsHisto[i];
} }
if (minVal > info.blockRowsHisto[i]) { if (minVal > pData->blockRowsHisto[i]) {
minVal = info.blockRowsHisto[i]; minVal = pData->blockRowsHisto[i];
} }
} }
int32_t delta = maxVal - minVal; int32_t delta = maxVal - minVal;
int32_t step = delta / 50; int32_t step = delta / 50;
if (step == 0) {
step = 1;
}
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets;
int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); bool singleModel = false;
int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets; if (bucketRange == 0) {
singleModel = true;
step = 20;
bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
}
for (int32_t i = 0; i < 20; ++i) { for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1)); len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
int32_t num = 0;
if (singleModel && pData->blockRowsHisto[i] > 0) {
num = 20;
} else {
num = (pData->blockRowsHisto[i] + step - 1) / step;
}
int32_t num = (info.blockRowsHisto[i] + step - 1) / step;
for (int32_t j = 0; j < num; ++j) { for (int32_t j = 0; j < num; ++j) {
int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|'); int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|');
len += x; len += x;
} }
double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks; double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%'); len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
printf("%s\n", st); printf("%s\n", st);
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
} }
return row; return TSDB_CODE_SUCCESS;
} }
typedef struct SDerivInfo { typedef struct SDerivInfo {
......
...@@ -419,6 +419,21 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p ...@@ -419,6 +419,21 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE); return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
} }
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(name.dbname, pStmt->dbName);
strcpy(name.tname, pStmt->tableName);
int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
pCxt->pStmt = pStmt; pCxt->pStmt = pStmt;
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
...@@ -497,6 +512,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -497,6 +512,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt); return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
default: default:
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册