提交 d1e40463 编写于 作者: Y yihaoDeng

add block dist info

上级 971c25a1
......@@ -41,7 +41,7 @@
#define COLUMN_INDEX_INITIAL_VAL (-3)
#define COLUMN_INDEX_INITIALIZER \
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_BLOCK_DIST_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
typedef struct SColumnList { // todo refactor
......@@ -1706,6 +1706,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema colSchema = tGetTableNameColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG);
} else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema colSchema = tGetBlockDistColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG);
} else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -2381,6 +2384,14 @@ static bool isTablenameToken(SStrToken* token) {
return (strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_TBNAME_L));
}
static bool isTableBlockDistToken(SStrToken* token) {
SStrToken tmpToken = *token;
SStrToken tableToken = {0};
extractTableNameFromToken(&tmpToken, &tableToken);
return (strncasecmp(TSQL_BLOCK_DIST, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_BLOCK_DIST_L));
}
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, index)->pTableMeta;
......@@ -2410,6 +2421,8 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (isTableBlockDistToken(pToken)) {
pIndex->columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX;
} else if (strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else {
......@@ -2650,8 +2663,7 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (!validateIpAddress(pDnodeIp->z, pDnodeIp->n)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
}
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -1091,6 +1091,8 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
// set the correct columnIndex index
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
} else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
pExpr->colInfo.colId = pColIndex->columnIndex;
} else {
......@@ -1507,7 +1509,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId == TSDB_BLOCK_DIST_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
return true;
}
......
......@@ -31,6 +31,8 @@ void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
SSchema tGetTableNameColumnSchema();
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
bool tscValidateTableNameLength(size_t len);
......
......@@ -58,6 +58,14 @@ SSchema tGetTableNameColumnSchema() {
tstrncpy(s.name, TSQL_TBNAME_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetBlockDistColumnSchema() {
SSchema s = {0};
s.bytes = TSDB_MAX_BINARY_LEN;;
s.type = TSDB_DATA_TYPE_BINARY;
s.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
tstrncpy(s.name, TSQL_BLOCK_DIST_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name) {
SSchema s = {0};
......
......@@ -235,7 +235,9 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-100)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_CACHE_BLOCK_SIZE 1
......
......@@ -232,13 +232,30 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo, SMemRef* pRef);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/**
* move to next block if exists
* move to next block if exists
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
/**
* move to next block if exists but not merge data in memtable
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle);
/**
* Get current data block information
......
......@@ -194,6 +194,7 @@ typedef struct SQueryRuntimeEnv {
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
......
......@@ -28,6 +28,7 @@
#include "queryLog.h"
#include "tlosertree.h"
#include "ttype.h"
#include "tcompare.h"
#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1)
......@@ -90,6 +91,13 @@ typedef struct {
STSCursor cur;
} SQueryStatusInfo;
typedef struct {
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
char *result;
} STableBlockDist;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
......@@ -1907,6 +1915,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor
SSchema s = tGetTableNameColumnSchema();
pCtx->inputBytes = s.bytes;
pCtx->inputType = s.type;
} else if (pIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pCtx->inputBytes = s.bytes;
pCtx->inputType = s.type;
} else {
......@@ -4381,7 +4393,57 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
}
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist != NULL) {
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
free(pTableBlockDist->result);
free(pTableBlockDist);
}
}
static int32_t getPercentileFromSortedArray(const SArray* pArray, float rate) {
size_t len = taosArrayGetSize(pArray);
if (len == 0) {
return 0;
}
assert(rate >= 0 && rate <= 1.0);
int idx = (int32_t)((len - 1) * rate);
return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows;
}
static int32_t compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SDataBlockInfo *)pLeft)->rows;
int32_t right = ((SDataBlockInfo *)pRight)->rows;
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist == NULL) {
return;
}
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
SDataBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->rows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
}
avg = totalBlocks > 0 ? (int32_t)(((totalRows * 1.0)/totalBlocks)) : 0;
taosArraySort(blockInfos, compareBlockInfo);
sprintf(pTableBlockDist->result,
"summery: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%ld], max=[%ld], avg = [%ld] \n\t totalRows=[%ld], totalBlocks=[%ld] \n\t seekHeaderTimeCost=[%ld(us)] \n\t rowsInMem=[%ld]",
getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50),
getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99),
min, max, avg,
totalRows, totalBlocks,
pTableBlockDist->firstSeekTimeUs,
pTableBlockDist->numOfRowsInMemTable);
}
void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -5836,6 +5898,58 @@ static void tableQueryImpl(SQInfo *pQInfo) {
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
}
static void buildTableBlockDistResult(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->pos = 0;
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo));
pTableBlockDist->result = malloc(512);
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
SSchema blockDistSchema = tGetBlockDistColumnSchema();
int64_t startTime = taosGetTimestampUs();
while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
if (pTableBlockDist->firstSeekTimeUs == 0) {
pTableBlockDist->firstSeekTimeUs = taosGetTimestampUs() - startTime;
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
taosArrayPush(pTableBlockDist->dataBlockInfos, &blockInfo);
}
if (terrno != TSDB_CODE_SUCCESS) {
freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, terrno);
}
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
generateBlockDistResult(pTableBlockDist);
int type = -1;
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = pQuery->pExpr1;
for (int32_t j = 0; j < pQuery->numOfOutput; j++) {
if (pExprInfo[j].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
type = blockDistSchema.type;
}
assert(type == TSDB_DATA_TYPE_BINARY);
STR_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result);
}
freeTableBlockDist(pTableBlockDist);
pQuery->rec.rows = 1;
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
static void stableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -5864,7 +5978,10 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
if (pExprMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
} else if (pExprMsg->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
return TSDB_BLOCK_DIST_COLUMN_INDEX;
}
while(j < pQueryMsg->numOfTags) {
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
......@@ -6321,6 +6438,10 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
SSchema s = tGetTableNameColumnSchema();
type = s.type;
bytes = s.bytes;
} else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
type = s.type;
bytes = s.bytes;
} else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) {
// it is a user-defined constant value column
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
......@@ -6334,7 +6455,7 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
} else {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
return TSDB_CODE_QRY_INVALID_MSG;
}
} else {
......@@ -6504,7 +6625,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX || pColIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
}
}
}
......@@ -6707,9 +6828,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
index += 1;
}
}
colIdCheck(pQuery);
pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
......@@ -7219,6 +7341,8 @@ bool qTableQuery(qinfo_t qinfo) {
buildTagQueryResult(pQInfo);
} else if (pQInfo->runtimeEnv.stableQuery) {
stableQueryImpl(pQInfo);
} else if (pQInfo->runtimeEnv.queryBlockDist){
buildTableBlockDistResult(pQInfo);
} else {
tableQueryImpl(pQInfo);
}
......
......@@ -210,6 +210,36 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
pQueryHandle->pMemRef = NULL;
}
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(pQueryHandle->activeIndex < size && pQueryHandle->activeIndex >= 0 && size >= 1);
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int64_t rows = 0;
SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pMemRef == NULL) { return rows; }
STableData* pMem = NULL;
STableData* pIMem = NULL;
SMemTable *pMemT = (SMemTable *)(pMemRef->mem);
SMemTable *pIMemT = (SMemTable *)(pMemRef->imem);
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid];
rows += (pMem && pMem->uid == pCheckInfo->tableId.uid) ? pMem->numOfRows: 0;
}
if (pIMemT && pCheckInfo->tableId.tid < pIMemT->maxTables) {
pIMem = pIMemT->tData[pCheckInfo->tableId.tid];
rows += (pIMem && pIMem->uid == pCheckInfo->tableId.uid) ? pIMem->numOfRows: 0;
}
return rows;
}
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
assert(sizeOfGroup >= 1 && pMeta != NULL);
......@@ -2218,6 +2248,84 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return ret;
}
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
SMemRef* pMemRef = pQueryHandle->pMemRef;
tsdbMayTakeMemSnapshot(pQueryHandle);
bool ret = getNeighborRows(pQueryHandle);
tsdbMayUnTakeMemSnapshot(pQueryHandle);
// restore the pMemRef
pQueryHandle->pMemRef = pMemRef;
return ret;
} else if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (++pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
return true;
}
return false;
}
if (pQueryHandle->checkFiles) {
// check if the query range overlaps with the file data block
bool exists = true;
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
return false;
}
if (exists) {
pQueryHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
return exists;
}
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
}
elapsedTime = taosGetTimestampUs() - stime;
pQueryHandle->cost.checkForNextTime += elapsedTime;
return false;
}
/*
* 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL
* 2. has data but not loaded, just return lastKey but not set pRes
......
......@@ -27,6 +27,9 @@ extern "C" {
#define TSQL_TBNAME "TBNAME"
#define TSQL_TBNAME_L "tbname"
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
#define TSQL_BLOCK_DIST_L "_block_dist"
// used to denote the minimum unite in sql parsing
typedef struct SStrToken {
uint32_t n;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册