未验证 提交 dde99cbb 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #6282 from taosdata/feature/td-4243-2

[TD-4243]<enhance>:remove _block_dist() special dedicated column index and reduce memory usage by approximate percentile
......@@ -41,11 +41,11 @@
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
// -1 is tbname column index, so here use the -3 as the initial value
#define COLUMN_INDEX_INITIAL_VAL (-3)
// -1 is tbname column index, so here use the -2 as the initial value
#define COLUMN_INDEX_INITIAL_VAL (-2)
#define COLUMN_INDEX_INITIALIZER \
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_BLOCK_DIST_COLUMN_INDEX))
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
typedef struct SColumnList { // todo refactor
......@@ -1873,9 +1873,6 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema* colSchema = tGetTbnameColumnSchema();
tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
} else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema colSchema = tGetBlockDistColumnSchema();
tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
} else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -2487,7 +2484,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = {.tableIndex = 0, .columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX,};
SColumnIndex index = {.tableIndex = 0, .columnIndex = 0,};
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema s = {.name = "block_dist", .type = TSDB_DATA_TYPE_BINARY};
......@@ -2495,10 +2492,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int16_t resType = 0;
int16_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0);
s.bytes = bytes;
s.type = (uint8_t)resType;
SExprInfo* pExpr = tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, &s, TSDB_COL_TAG, getNewResColId(pCmd));
SExprInfo* pExpr = tscExprInsert(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, resType,
bytes, getNewResColId(pCmd), bytes, 0);
tstrncpy(pExpr->base.aliasName, s.name, sizeof(pExpr->base.aliasName));
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
insertResultField(pQueryInfo, 0, &ids, bytes, s.type, s.name, pExpr);
pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = pTableMetaInfo->pTableMeta->tableInfo.rowSize;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
......@@ -2545,14 +2548,6 @@ static bool isTablenameToken(SStrToken* token) {
return (strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_TBNAME_L));
}
static bool isTableBlockDistToken(SStrToken* token) {
SStrToken tmpToken = *token;
SStrToken tableToken = {0};
extractTableNameFromToken(&tmpToken, &tableToken);
return (strncasecmp(TSQL_BLOCK_DIST, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_BLOCK_DIST_L));
}
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, index)->pTableMeta;
......@@ -2582,8 +2577,6 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (isTableBlockDistToken(pToken)) {
pIndex->columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX;
} else if (strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else {
......
......@@ -527,7 +527,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
bool isBlockDistQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
return (numOfExprs == 1 && pExpr->base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
return (numOfExprs == 1 && pExpr->base.functionId == TSDB_FUNC_BLKINFO);
}
void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
......@@ -2048,16 +2048,14 @@ SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
p->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
p->colBytes = s->bytes;
p->colType = s->type;
} else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
p->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
p->colBytes = s.bytes;
p->colType = s.type;
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
p->colInfo.colId = pColIndex->columnIndex;
p->colBytes = size;
p->colType = type;
} else if (functionId == TSDB_FUNC_BLKINFO) {
p->colInfo.colId = pColIndex->columnIndex;
p->colBytes = TSDB_MAX_BINARY_LEN;
p->colType = TSDB_DATA_TYPE_BINARY;
} else {
if (TSDB_COL_IS_TAG(colType)) {
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
......@@ -2553,7 +2551,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId == TSDB_BLOCK_DIST_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
return true;
}
......
......@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
bool tscValidateTableNameLength(size_t len);
......
......@@ -33,15 +33,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len) {
return strlen(prefix);
}
SSchema tGetBlockDistColumnSchema() {
SSchema s = {0};
s.bytes = TSDB_MAX_BINARY_LEN;;
s.type = TSDB_DATA_TYPE_BINARY;
s.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
tstrncpy(s.name, TSQL_BLOCK_DIST_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name) {
SSchema s = {0};
......
......@@ -244,7 +244,6 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
......
......@@ -215,7 +215,7 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numOfRows;
int32_t numBlocksOfStep;
} SFileBlockInfo;
typedef struct {
......@@ -229,11 +229,15 @@ typedef struct {
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
SArray *dataBlockInfos;
......
......@@ -19,6 +19,7 @@
#include "texpr.h"
#include "ttype.h"
#include "tsdb.h"
#include "tglobal.h"
#include "qAggMain.h"
#include "qFill.h"
......@@ -4828,51 +4829,81 @@ void blockInfo_func(SQLFunctionCtx* pCtx) {
pResInfo->hasResult = DATA_SET_FLAG;
}
static void mergeTableBlockDist(STableBlockDist* pDist, const STableBlockDist* pSrc) {
static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockDist* pSrc) {
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
assert(pDist != NULL && pSrc != NULL);
pDist->numOfTables += pSrc->numOfTables;
pDist->numOfRowsInMemTable += pSrc->numOfRowsInMemTable;
pDist->numOfFiles += pSrc->numOfFiles;
pDist->totalSize += pSrc->totalSize;
pDist->totalRows += pSrc->totalRows;
if (pDist->dataBlockInfos == NULL) {
pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo));
if (pResInfo->hasResult == DATA_SET_FLAG) {
pDist->maxRows = MAX(pDist->maxRows, pSrc->maxRows);
pDist->minRows = MIN(pDist->minRows, pSrc->minRows);
} else {
pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows;
int32_t numSteps = tsMaxRowsInFileBlock/TSDB_BLOCK_DIST_STEP_ROWS;
pDist->dataBlockInfos = taosArrayInit(numSteps, sizeof(SFileBlockInfo));
taosArraySetSize(pDist->dataBlockInfos, numSteps);
}
taosArrayAddBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos));
size_t steps = taosArrayGetSize(pDist->dataBlockInfos);
for (int32_t i = 0; i < steps; ++i) {
int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep;
SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i);
blockInfo->numBlocksOfStep += srcNumBlocks;
}
}
void block_func_merge(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDist info = {0};
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info);
mergeTableBlockDist(pDist, &info);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
mergeTableBlockDist(pResInfo, &info);
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
}
static int32_t doGetPercentile(const SArray* pArray, double rate) {
int32_t len = (int32_t)taosArrayGetSize(pArray);
if (len <= 0) {
return 0;
void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents,
double* percents, int32_t* percentiles) {
if (totalBlocks == 0) {
for (int32_t i = 0; i < numOfPercents; ++i) {
percentiles[i] = 0;
}
return;
}
assert(rate >= 0 && rate <= 1.0);
int idx = (int32_t)((len - 1) * rate);
SArray *blocksInfos = pTableBlockDist->dataBlockInfos;
size_t numSteps = taosArrayGetSize(blocksInfos);
size_t cumulativeBlocks = 0;
return ((SFileBlockInfo *)(taosArrayGet(pArray, idx)))->numOfRows;
}
int percentIndex = 0;
for (int32_t indexStep = 0; indexStep < numSteps; ++indexStep) {
int32_t numStepBlocks = ((SFileBlockInfo *)taosArrayGet(blocksInfos, indexStep))->numBlocksOfStep;
if (numStepBlocks == 0) continue;
cumulativeBlocks += numStepBlocks;
static int compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SFileBlockInfo *)pLeft)->numOfRows;
int32_t right = ((SFileBlockInfo *)pRight)->numOfRows;
while (percentIndex < numOfPercents) {
double blockRank = totalBlocks * percents[percentIndex];
if (blockRank <= cumulativeBlocks) {
percentiles[percentIndex] = indexStep;
++percentIndex;
} else {
break;
}
}
}
if (left > right) return 1;
if (left < right) return -1;
return 0;
for (int32_t i = 0; i < numOfPercents; ++i) {
percentiles[i] = (percentiles[i]+1) * TSDB_BLOCK_DIST_STEP_ROWS - TSDB_BLOCK_DIST_STEP_ROWS/2;
}
}
void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
......@@ -4880,40 +4911,41 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
return;
}
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
SArray* blockInfos = pTableBlockDist->dataBlockInfos;
uint64_t totalRows = pTableBlockDist->totalRows;
size_t numSteps = taosArrayGetSize(blockInfos);
int64_t totalBlocks = 0;
int64_t min = -1, max = -1, avg = 0;
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
for (int32_t i = 0; i < numSteps; i++) {
SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->numOfRows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
int64_t blocks = blockInfo->numBlocksOfStep;
totalBlocks += blocks;
}
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
taosArraySort(blockInfos, compareBlockInfo);
min = totalBlocks > 0 ? pTableBlockDist->minRows : 0;
max = totalBlocks > 0 ? pTableBlockDist->maxRows : 0;
double percents[] = {0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99};
int32_t percentiles[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
assert(sizeof(percents)/sizeof(double) == sizeof(percentiles)/sizeof(int32_t));
getPercentiles(pTableBlockDist, totalBlocks, sizeof(percents)/sizeof(double), percents, percentiles);
uint64_t totalLen = pTableBlockDist->totalSize;
int32_t rowSize = pTableBlockDist->rowSize;
double compRatio = (totalRows>0) ? ((double)(totalLen)/(rowSize*totalRows)) : 1;
int sz = sprintf(result + VARSTR_HEADER_SIZE,
"summary: \n\t "
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRId64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f%%]\n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f]\n\t "
"RowsInMem=[%d] \n\t SeekHeaderTime=[%d(us)]",
doGetPercentile(blockInfos, 0.05), doGetPercentile(blockInfos, 0.10),
doGetPercentile(blockInfos, 0.20), doGetPercentile(blockInfos, 0.30),
doGetPercentile(blockInfos, 0.40), doGetPercentile(blockInfos, 0.50),
doGetPercentile(blockInfos, 0.60), doGetPercentile(blockInfos, 0.70),
doGetPercentile(blockInfos, 0.80), doGetPercentile(blockInfos, 0.90),
doGetPercentile(blockInfos, 0.95), doGetPercentile(blockInfos, 0.99),
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
min, max, avg, 0.0,
totalRows, totalBlocks, totalLen/1024.0, (double)(totalLen*100.0)/(rowSize*totalRows),
totalRows, totalBlocks, totalLen/1024.0, compRatio,
pTableBlockDist->numOfRowsInMemTable, pTableBlockDist->firstSeekTimeUs);
varDataSetLen(result, sz);
UNUSED(sz);
......
......@@ -935,7 +935,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
setArithParams((SArithmeticSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCol->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) ||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCtx[i].functionId == TSDB_FUNC_BLKINFO) ||
(TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
......@@ -3771,7 +3771,7 @@ void queryCostStatis(SQInfo *pQInfo) {
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//
// qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv),
// qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv),
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
//}
......@@ -4347,7 +4347,12 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
STableBlockDist tableBlockDist = {0};
tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo));
int32_t numRowSteps = tsMaxRowsInFileBlock / TSDB_BLOCK_DIST_STEP_ROWS;
tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);
tableBlockDist.maxRows = INT_MIN;
tableBlockDist.minRows = INT_MAX;
tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist);
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle);
......@@ -4429,7 +4434,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
SColumnInfoData infoData = {{0}};
infoData.info.type = TSDB_DATA_TYPE_BINARY;
infoData.info.bytes = 1024;
infoData.info.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
infoData.info.colId = 0;
taosArrayPush(pInfo->block.pDataBlock, &infoData);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -5958,10 +5963,7 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
} else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
return TSDB_BLOCK_DIST_COLUMN_INDEX;
}
while(j < pTableInfo->numOfTags) {
if (pExpr->colInfo.colId == pTagCols[j].colId) {
......@@ -6531,14 +6533,14 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypes[type].bytes;
} else if (pExprs[i].base.functionId == TSDB_FUNC_BLKINFO) {
SSchema s = {.type=TSDB_DATA_TYPE_BINARY, .bytes=TSDB_MAX_BINARY_LEN};
type = s.type;
bytes = s.bytes;
} else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column
SSchema* s = tGetTbnameColumnSchema();
type = s->type;
bytes = s->bytes;
} else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
type = s.type;
bytes = s.bytes;
} else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.colInfo.colId > TSDB_RES_COL_ID) {
// it is a user-defined constant value column
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
......@@ -6551,7 +6553,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
} else {
int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols);
if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pTableInfo->numOfTags) {
if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pTableInfo->numOfTags) {
return TSDB_CODE_QRY_INVALID_MSG;
}
} else {
......@@ -6787,9 +6789,6 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
assert(f < pQueryAttr->numOfCols);
} else if (pColIndex->colId <= TSDB_UD_COLUMN_INDEX) {
// do nothing for user-defined constant value result columns
} else if (pColIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
pColIndex->colIndex = 0;// only one source column, so it must be 0;
assert(pQueryAttr->numOfOutput == 1);
} else {
int32_t f = 0;
for (f = 0; f < pQueryAttr->numOfTags; ++f) {
......@@ -6799,7 +6798,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
}
}
assert(f < pQueryAttr->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX || pColIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
assert(f < pQueryAttr->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
}
}
}
......@@ -6991,7 +6990,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
colIdCheck(pQueryAttr, pQInfo->qId);
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.functionId == TSDB_FUNC_BLKINFO);
qDebug("qmsg:%p QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo;
......
......@@ -581,6 +581,9 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
tbufWriteUint32(bw, pDist->numOfTables);
tbufWriteUint16(bw, pDist->numOfFiles);
tbufWriteUint64(bw, pDist->totalSize);
tbufWriteUint64(bw, pDist->totalRows);
tbufWriteInt32(bw, pDist->maxRows);
tbufWriteInt32(bw, pDist->minRows);
tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
......@@ -616,13 +619,16 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
pDist->numOfTables = tbufReadUint32(&br);
pDist->numOfFiles = tbufReadUint16(&br);
pDist->totalSize = tbufReadUint64(&br);
pDist->totalRows = tbufReadUint64(&br);
pDist->maxRows = tbufReadInt32(&br);
pDist->minRows = tbufReadInt32(&br);
pDist->numOfRowsInMemTable = tbufReadUint32(&br);
int64_t numOfBlocks = tbufReadUint64(&br);
int64_t numSteps = tbufReadUint64(&br);
bool comp = tbufReadUint8(&br);
uint32_t compLen = tbufReadUint32(&br);
size_t originalLen = (size_t) (numOfBlocks*sizeof(SFileBlockInfo));
size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo));
char* outputBuf = NULL;
if (comp) {
......@@ -633,12 +639,12 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
(int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
assert(orignalLen == numOfBlocks*sizeof(SFileBlockInfo));
assert(orignalLen == numSteps *sizeof(SFileBlockInfo));
} else {
outputBuf = (char*) tbufReadBinary(&br, &originalLen);
}
pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t) numOfBlocks, sizeof(SFileBlockInfo));
pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo));
if (comp) {
tfree(outputBuf);
}
......
......@@ -2128,6 +2128,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0;
STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb);
// find the start data block in file
......@@ -2201,7 +2202,12 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
pTableBlockInfo->totalSize += pBlock[j].len;
int32_t numOfRows = pBlock[j].numOfRows;
taosArrayPush(pTableBlockInfo->dataBlockInfos, &numOfRows);
pTableBlockInfo->totalRows += numOfRows;
if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows;
if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows;
int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
blockInfo->numBlocksOfStep++;
}
}
}
......
......@@ -106,6 +106,14 @@ void* taosArrayGetLast(const SArray* pArray);
*/
size_t taosArrayGetSize(const SArray* pArray);
/**
* set the size of array
* @param pArray
* @param size size of the array
* @return
*/
void taosArraySetSize(SArray* pArray, size_t size);
/**
* insert data into array
* @param pArray
......
......@@ -115,6 +115,11 @@ void* taosArrayGetLast(const SArray* pArray) {
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
void taosArraySetSize(SArray* pArray, size_t size) {
assert(size <= pArray->capacity);
pArray->size = size;
}
void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
if (pArray == NULL || pData == NULL) {
return NULL;
......
......@@ -33,6 +33,7 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
$dbPrefix = m_di_db
$tbPrefix = m_di_tb
$mtPrefix = m_di_mt
$ntPrefix = m_di_nt
$tbNum = 1
$rowNum = 2000
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
$nt = $ntPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sql create table $nt (ts timestamp, tbcol int)
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $nt values ($ms , $x )
$x = $x + 1
endw
sleep 100
print =============== step2
$i = 0
$tb = $tbPrefix . $i
sql select _block_dist() from $tb
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== step3
$i = 0
$mt = $mtPrefix . $i
sql select _block_dist() from $mt
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== step4
$i = 0
$nt = $ntPrefix . $i
sql select _block_dist() from $nt
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -14,3 +14,4 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
......@@ -32,6 +32,7 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册