提交 e313cf3b 编写于 作者: H Haojun Liao

[td-2985] refactor.

上级 bd2caded
......@@ -133,6 +133,7 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
......
......@@ -100,6 +100,10 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
} else if (functionId == TSDB_FUNC_BLKINFO) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
pCtx->numOfParams = 1;
}
pCtx->interBufBytes = pExpr->interBytes;
......
......@@ -1539,6 +1539,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
bool hasDistinct = false;
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
......@@ -1547,25 +1548,26 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
if (hasDistinct == false) {
hasDistinct = (pItem->distinct == true);
}
// project on all fields
int32_t optr = pItem->pNode->nSQLOptr;
if (optr == TK_ALL || optr == TK_ID || optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) {
// it is actually a function, but the function name is invalid
if (pItem->pNode->nSQLOptr == TK_ID && (pItem->pNode->colInfo.z == NULL && pItem->pNode->colInfo.n == 0)) {
int32_t optr = pItem->pNode->nSQLOptr;
if (optr == TK_FUNCTION) {
int32_t functId = isValidFunction(pItem->pNode->token.z, pItem->pNode->token.n);
if (functId < 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
if (addProjectionExprAndResultField(pCmd, pQueryInfo, pItem) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
} else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_TBID) {
pItem->pNode->nSQLOptr = functId;
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, pItem, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
} else if (optr == TK_ALL || optr == TK_ID || optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) {
// use the dynamic array list to decide if the function is valid or not
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
if (addProjectionExprAndResultField(pCmd, pQueryInfo, pItem) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
} else if (pItem->pNode->nSQLOptr >= TK_PLUS && pItem->pNode->nSQLOptr <= TK_REM) {
int32_t code = handleArithmeticExpr(pCmd, clauseIndex, i, pItem);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1589,7 +1591,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
// there is only one user-defined column in the final result field, add the timestamp column.
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
addPrimaryTsColIntoResult(pQueryInfo);
}
......@@ -1876,7 +1878,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg9 = "invalid function";
switch (optr) {
case TK_COUNT: {
case TSDB_FUNC_COUNT: {
/* more than one parameter for count() function */
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
......@@ -1971,23 +1973,23 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
case TK_SUM:
case TK_AVG:
case TK_RATE:
case TK_IRATE:
case TK_SUM_RATE:
case TK_SUM_IRATE:
case TK_AVG_RATE:
case TK_AVG_IRATE:
case TK_TWA:
case TK_MIN:
case TK_MAX:
case TK_DIFF:
case TK_STDDEV:
case TK_LEASTSQUARES: {
case TSDB_FUNC_SUM:
case TSDB_FUNC_AVG:
case TSDB_FUNC_RATE:
case TSDB_FUNC_IRATE:
case TSDB_FUNC_SUM_RATE:
case TSDB_FUNC_SUM_IRATE:
case TSDB_FUNC_AVG_RATE:
case TSDB_FUNC_AVG_IRATE:
case TSDB_FUNC_TWA:
case TSDB_FUNC_MIN:
case TSDB_FUNC_MAX:
case TSDB_FUNC_DIFF:
case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR: {
// 1. valid the number of parameters
if (pItem->pNode->pParam == NULL || (optr != TK_LEASTSQUARES && pItem->pNode->pParam->nExpr != 1) ||
(optr == TK_LEASTSQUARES && pItem->pNode->pParam->nExpr != 3)) {
if (pItem->pNode->pParam == NULL || (optr != TSDB_FUNC_LEASTSQR && pItem->pNode->pParam->nExpr != 1) ||
(optr == TSDB_FUNC_LEASTSQR && pItem->pNode->pParam->nExpr != 3)) {
/* no parameters or more than one parameter for function */
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2029,7 +2031,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
// set the first column ts for diff query
if (optr == TK_DIFF) {
if (optr == TSDB_FUNC_DIFF) {
colIndex += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
......@@ -2046,7 +2048,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
if (optr == TK_LEASTSQUARES) {
if (optr == TSDB_FUNC_LEASTSQR) {
/* set the leastsquares parameters */
char val[8] = {0};
if (tVariantDump(&pParamElem[1].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
......@@ -2082,11 +2084,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
return TSDB_CODE_SUCCESS;
}
case TK_FIRST:
case TK_LAST:
case TK_SPREAD:
case TK_LAST_ROW:
case TK_INTERP: {
case TSDB_FUNC_FIRST:
case TSDB_FUNC_LAST:
case TSDB_FUNC_SPREAD:
case TSDB_FUNC_LAST_ROW:
case TSDB_FUNC_INTERP: {
bool requireAllFields = (pItem->pNode->pParam == NULL);
int16_t functionID = 0;
......@@ -2162,7 +2164,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_SQL;
}
if (optr == TK_LAST) { // todo refactor
if (optr == TSDB_FUNC_LAST) { // todo refactor
SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr;
if (pGroupBy->numOfGroupCols > 0) {
for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) {
......@@ -2210,14 +2212,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
}
return TSDB_CODE_SUCCESS;
}
}
case TK_TOP:
case TK_BOTTOM:
case TK_PERCENTILE:
case TK_APERCENTILE: {
case TSDB_FUNC_TOP:
case TSDB_FUNC_BOTTOM:
case TSDB_FUNC_PERCT:
case TSDB_FUNC_APERCT: {
// 1. valid the number of parameters
if (pItem->pNode->pParam == NULL || pItem->pNode->pParam->nExpr != 2) {
/* no parameters or more than one parameter for function */
......@@ -2265,7 +2266,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
char val[8] = {0};
SSqlExpr* pExpr = NULL;
if (optr == TK_PERCENTILE || optr == TK_APERCENTILE) {
if (optr == TSDB_FUNC_PERCT || optr == TSDB_FUNC_APERCT) {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true);
double dp = GET_DOUBLE_VAL(val);
......@@ -2336,7 +2337,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
};
case TK_TBID: {
case TSDB_FUNC_TID_TAG: {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
......@@ -2404,6 +2405,30 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
case TSDB_FUNC_BLKINFO: {
// no parameters or more than one parameter for function
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = {.tableIndex = 0, .columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX,};
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema s = {.name = "block_dist", .type = TSDB_DATA_TYPE_BINARY};
int32_t inter = 0;
int16_t resType = 0;
int16_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0);
s.bytes = bytes;
s.type = resType;
SSqlExpr* pExpr = tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, &s, TSDB_COL_TAG);
pExpr->numOfParams = 1;
pExpr->param[0].i64 = pTableMetaInfo->pTableMeta->tableInfo.rowSize;
pExpr->param[0].nType = TSDB_DATA_TYPE_BIGINT;
return TSDB_CODE_SUCCESS;
}
default:
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -2572,76 +2597,76 @@ int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQueryInfo*
int32_t convertFunctionId(int32_t optr, int16_t* functionId) {
switch (optr) {
case TK_COUNT:
case TSDB_FUNC_COUNT:
*functionId = TSDB_FUNC_COUNT;
break;
case TK_SUM:
case TSDB_FUNC_SUM:
*functionId = TSDB_FUNC_SUM;
break;
case TK_AVG:
case TSDB_FUNC_AVG:
*functionId = TSDB_FUNC_AVG;
break;
case TK_RATE:
case TSDB_FUNC_RATE:
*functionId = TSDB_FUNC_RATE;
break;
case TK_IRATE:
case TSDB_FUNC_IRATE:
*functionId = TSDB_FUNC_IRATE;
break;
case TK_SUM_RATE:
case TSDB_FUNC_SUM_RATE:
*functionId = TSDB_FUNC_SUM_RATE;
break;
case TK_SUM_IRATE:
case TSDB_FUNC_SUM_IRATE:
*functionId = TSDB_FUNC_SUM_IRATE;
break;
case TK_AVG_RATE:
case TSDB_FUNC_AVG_RATE:
*functionId = TSDB_FUNC_AVG_RATE;
break;
case TK_AVG_IRATE:
case TSDB_FUNC_AVG_IRATE:
*functionId = TSDB_FUNC_AVG_IRATE;
break;
case TK_MIN:
case TSDB_FUNC_MIN:
*functionId = TSDB_FUNC_MIN;
break;
case TK_MAX:
case TSDB_FUNC_MAX:
*functionId = TSDB_FUNC_MAX;
break;
case TK_STDDEV:
case TSDB_FUNC_STDDEV:
*functionId = TSDB_FUNC_STDDEV;
break;
case TK_PERCENTILE:
case TSDB_FUNC_PERCT:
*functionId = TSDB_FUNC_PERCT;
break;
case TK_APERCENTILE:
case TSDB_FUNC_APERCT:
*functionId = TSDB_FUNC_APERCT;
break;
case TK_FIRST:
case TSDB_FUNC_FIRST:
*functionId = TSDB_FUNC_FIRST;
break;
case TK_LAST:
case TSDB_FUNC_LAST:
*functionId = TSDB_FUNC_LAST;
break;
case TK_LEASTSQUARES:
case TSDB_FUNC_LEASTSQR:
*functionId = TSDB_FUNC_LEASTSQR;
break;
case TK_TOP:
case TSDB_FUNC_TOP:
*functionId = TSDB_FUNC_TOP;
break;
case TK_BOTTOM:
case TSDB_FUNC_BOTTOM:
*functionId = TSDB_FUNC_BOTTOM;
break;
case TK_DIFF:
case TSDB_FUNC_DIFF:
*functionId = TSDB_FUNC_DIFF;
break;
case TK_SPREAD:
case TSDB_FUNC_SPREAD:
*functionId = TSDB_FUNC_SPREAD;
break;
case TK_TWA:
case TSDB_FUNC_TWA:
*functionId = TSDB_FUNC_TWA;
break;
case TK_INTERP:
case TSDB_FUNC_INTERP:
*functionId = TSDB_FUNC_INTERP;
break;
case TK_LAST_ROW:
case TSDB_FUNC_LAST_ROW:
*functionId = TSDB_FUNC_LAST_ROW;
break;
default:
......@@ -3176,7 +3201,7 @@ static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) {
} else if (pExpr->nSQLOptr >= TK_BOOL && pExpr->nSQLOptr <= TK_STRING) { // value
*str += tVariantToString(&pExpr->val, *str);
} else if (pExpr->nSQLOptr >= TK_COUNT && pExpr->nSQLOptr <= TK_AVG_IRATE) {
} else if (pExpr->nSQLOptr >= TSDB_FUNC_COUNT && pExpr->nSQLOptr <= TSDB_FUNC_BLKINFO) {
/*
* arithmetic expression of aggregation, such as count(ts) + count(ts) *2
*/
......@@ -3579,7 +3604,7 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQuer
pList->ids[pList->num++] = index;
} else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) {
return TSDB_CODE_TSC_INVALID_SQL;
} else if (pExpr->nSQLOptr >= TK_COUNT && pExpr->nSQLOptr <= TK_AVG_IRATE) {
} else if (pExpr->nSQLOptr >= TSDB_FUNC_COUNT && pExpr->nSQLOptr <= TSDB_FUNC_AVG_IRATE) {
if (*type == NON_ARITHMEIC_EXPR) {
*type = AGG_ARIGHTMEIC;
} else if (*type == NORMAL_ARITHMETIC) {
......@@ -3679,7 +3704,7 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) {
*
* However, columnA < 4+12 is valid
*/
if (pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_AVG_IRATE) {
if (pLeft->nSQLOptr >= TSDB_FUNC_COUNT && pLeft->nSQLOptr <= TSDB_FUNC_AVG_IRATE) {
return false;
}
......@@ -3687,7 +3712,7 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) {
return true;
}
if (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_AVG_IRATE) {
if (pRight->nSQLOptr >= TSDB_FUNC_COUNT && pRight->nSQLOptr <= TSDB_FUNC_AVG_IRATE) {
return false;
}
......@@ -6848,7 +6873,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
tVariantAssign((*pExpr)->pVal, &pSqlExpr->val);
return TSDB_CODE_SUCCESS;
} else if (pSqlExpr->nSQLOptr >= TK_COUNT && pSqlExpr->nSQLOptr <= TK_AVG_IRATE) {
} else if (pSqlExpr->nSQLOptr >= TSDB_FUNC_COUNT && pSqlExpr->nSQLOptr <= TSDB_FUNC_BLKINFO) {
// arithmetic expression on the results of aggregation functions
*pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TSQL_NODE_COL;
......
......@@ -700,7 +700,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
tscGetNumOfColumns(pTableMeta));
......
......@@ -97,6 +97,22 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) {
return true;
}
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->functionId;
// "select count(tbname)" query
if (functId == TSDB_FUNC_BLKINFO) {
return true;
}
}
return false;
}
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) {
return false;
......
......@@ -182,7 +182,6 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t len;
int32_t numOfRows;
} SFileBlockInfo;
......@@ -197,6 +196,16 @@ typedef struct {
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
SArray *dataBlockInfos;
} STableBlockDist;
/**
* Get the data block iterator, starting from position according to the query condition
*
......@@ -346,7 +355,7 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockInfo);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
/**
* get the statistics of repo usage
......
......@@ -188,44 +188,18 @@
#define TK_STATEMENT 169
#define TK_TRIGGER 170
#define TK_VIEW 171
#define TK_COUNT 172
#define TK_SUM 173
#define TK_AVG 174
#define TK_MIN 175
#define TK_MAX 176
#define TK_FIRST 177
#define TK_LAST 178
#define TK_TOP 179
#define TK_BOTTOM 180
#define TK_STDDEV 181
#define TK_PERCENTILE 182
#define TK_APERCENTILE 183
#define TK_LEASTSQUARES 184
#define TK_HISTOGRAM 185
#define TK_DIFF 186
#define TK_SPREAD 187
#define TK_TWA 188
#define TK_INTERP 189
#define TK_LAST_ROW 190
#define TK_RATE 191
#define TK_IRATE 192
#define TK_SUM_RATE 193
#define TK_SUM_IRATE 194
#define TK_AVG_RATE 195
#define TK_AVG_IRATE 196
#define TK_TBID 197
#define TK_SEMI 198
#define TK_NONE 199
#define TK_PREV 200
#define TK_LINEAR 201
#define TK_IMPORT 202
#define TK_METRIC 203
#define TK_TBNAME 204
#define TK_JOIN 205
#define TK_METRICS 206
#define TK_INSERT 207
#define TK_INTO 208
#define TK_VALUES 209
#define TK_SEMI 172
#define TK_NONE 173
#define TK_PREV 174
#define TK_LINEAR 175
#define TK_IMPORT 176
#define TK_METRIC 177
#define TK_TBNAME 178
#define TK_JOIN 179
#define TK_METRICS 180
#define TK_INSERT 181
#define TK_INTO 182
#define TK_VALUES 183
#define TK_SPACE 300
......@@ -236,6 +210,7 @@
#define TK_BIN 305 // bin format data 0b111
#define TK_FILE 306
#define TK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
#define TK_FUNCTION 308
#endif
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
#include "trpc.h"
#include "tvariant.h"
#include "tsdb.h"
#define TSDB_FUNC_INVALID_ID -1
#define TSDB_FUNC_COUNT 0
......@@ -70,15 +71,17 @@ extern "C" {
#define TSDB_FUNC_AVG_IRATE 34
#define TSDB_FUNC_TID_TAG 35
#define TSDB_FUNC_HISTOGRAM 36
#define TSDB_FUNC_HLL 37
#define TSDB_FUNC_MODE 38
#define TSDB_FUNC_SAMPLE 39
#define TSDB_FUNC_CEIL 40
#define TSDB_FUNC_FLOOR 41
#define TSDB_FUNC_ROUND 42
#define TSDB_FUNC_MAVG 43
#define TSDB_FUNC_CSUM 44
#define TSDB_FUNC_BLKINFO 36
#define TSDB_FUNC_HISTOGRAM 37
#define TSDB_FUNC_HLL 38
#define TSDB_FUNC_MODE 39
#define TSDB_FUNC_SAMPLE 40
#define TSDB_FUNC_CEIL 41
#define TSDB_FUNC_FLOOR 42
#define TSDB_FUNC_ROUND 43
#define TSDB_FUNC_MAVG 44
#define TSDB_FUNC_CSUM 45
#define TSDB_FUNCSTATE_SO 0x1u // single output
......@@ -221,6 +224,7 @@ typedef struct SAggFunctionInfo {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable);
int32_t isValidFunction(const char* name, int32_t len);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
......@@ -242,6 +246,10 @@ typedef struct STwaInfo {
STimeWindow win;
} STwaInfo;
struct SBufferWriter;
void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw);
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist);
/* global sql function array */
extern struct SAggFunctionInfo aAggs[];
......
......@@ -280,15 +280,16 @@ enum OPERATOR_TYPE_E {
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_Aggregate = 5,
OP_Arithmetic = 6,
OP_Groupby = 7,
OP_Limit = 8,
OP_Offset = 9,
OP_TimeInterval = 10,
OP_Fill = 11,
OP_MultiTableAggregate = 12,
OP_MultiTableTimeInterval = 13,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Arithmetic = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_Offset = 10,
OP_TimeInterval = 11,
OP_Fill = 12,
OP_MultiTableAggregate = 13,
OP_MultiTableTimeInterval = 14,
};
typedef struct SOperatorInfo {
......
......@@ -79,16 +79,6 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray* interResFromBinary(const char* data, int32_t len);
void freeInterResult(void* param);
typedef struct {
int64_t numOfTables;
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
} STableBlockDist;
void blockDistInfoToBinary(SBufferWriter* bw, STableBlockDist* pDist);
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist);
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
......
......@@ -805,6 +805,5 @@ cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); s
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES.
......@@ -18,6 +18,7 @@
#include "taosmsg.h"
#include "texpr.h"
#include "ttype.h"
#include "tsdb.h"
#include "qAggMain.h"
#include "qFill.h"
......@@ -26,11 +27,9 @@
#include "qTsbuf.h"
#include "queryLog.h"
//#define GET_INPUT_DATA_LIST(x) (((char *)((x)->pInput)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
//#define GET_TS_LIST(x) ((TSKEY*)&((x)->ptsList[(x)->startOffset]))
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
......@@ -190,6 +189,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*interBytes = 0;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_BLKINFO) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = 16384;
*interBytes = 0;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_COUNT) {
......@@ -354,6 +358,22 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
// TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) {
for(int32_t i = 0; i <= TSDB_FUNC_BLKINFO; ++i) {
int32_t nameLen = strlen(aAggs[i].name);
if (len != nameLen) {
continue;
}
if (strncasecmp(aAggs[i].name, name, len) == 0) {
return i;
}
}
return -1;
}
// set the query flag to denote that query is completed
static void no_next_step(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -4600,10 +4620,126 @@ static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
void blockInfo_func(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(pCtx->pInput + sizeof(int32_t), len, pDist);
pDist->rowSize = pCtx->param[0].i64;
/////////////////////////////////////////////////////////////////////////////////////////////
memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len);
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
}
static void mergeTableBlockDist(STableBlockDist* pDist, const STableBlockDist* pSrc) {
assert(pDist != NULL && pSrc != NULL);
pDist->numOfTables += pSrc->numOfTables;
pDist->numOfRowsInMemTable += pSrc->numOfRowsInMemTable;
pDist->numOfFiles += pSrc->numOfFiles;
pDist->totalSize += pSrc->totalSize;
if (pDist->dataBlockInfos == NULL) {
pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo));
}
taosArrayPushBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, taosArrayGetSize(pSrc->dataBlockInfos));
}
void block_func_merge(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDist info = {0};
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(pCtx->pInput + sizeof(int32_t), len, &info);
mergeTableBlockDist(pDist, &info);
}
static int32_t doGetPercentile(const SArray* pArray, double rate) {
int32_t len = (int32_t)taosArrayGetSize(pArray);
if (len <= 0) {
return 0;
}
assert(rate >= 0 && rate <= 1.0);
int idx = (int32_t)((len - 1) * rate);
return ((SFileBlockInfo *)(taosArrayGet(pArray, idx)))->numOfRows;
}
static int compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SFileBlockInfo *)pLeft)->numOfRows;
int32_t right = ((SFileBlockInfo *)pRight)->numOfRows;
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
if (pTableBlockDist == NULL) {
return;
}
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->numOfRows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
}
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
taosArraySort(blockInfos, compareBlockInfo);
uint64_t totalLen = pTableBlockDist->totalSize;
int32_t rowSize = pTableBlockDist->rowSize;
int sz = sprintf(result + VARSTR_HEADER_SIZE,
"summary: \n\t "
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRId64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f%%]\n\t "
"RowsInMem=[%d] \n\t SeekHeaderTime=[%d(us)]",
doGetPercentile(blockInfos, 0.05), doGetPercentile(blockInfos, 0.10),
doGetPercentile(blockInfos, 0.20), doGetPercentile(blockInfos, 0.30),
doGetPercentile(blockInfos, 0.40), doGetPercentile(blockInfos, 0.50),
doGetPercentile(blockInfos, 0.60), doGetPercentile(blockInfos, 0.70),
doGetPercentile(blockInfos, 0.80), doGetPercentile(blockInfos, 0.90),
doGetPercentile(blockInfos, 0.95), doGetPercentile(blockInfos, 0.99),
min, max, avg, 0.0,
totalRows, totalBlocks, totalLen/1024.0, (double)(totalLen*100.0)/(rowSize*totalRows),
pTableBlockDist->numOfRowsInMemTable, pTableBlockDist->firstSeekTimeUs);
varDataSetLen(result, sz);
UNUSED(sz);
}
void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
pDist->rowSize = pCtx->param[0].i64;
generateBlockDistResult(pDist, pCtx->pOutput);
// cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
* tag and ts are not involved in the compatibility check
......@@ -4622,8 +4758,8 @@ int32_t functionCompatList[] = {
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp rate irate
1, 1, 1, 1, -1, 1, 1, 5, 1, 1,
// sum_rate, sum_irate, avg_rate, avg_irate
1, 1, 1, 1,
// sum_rate, sum_irate, avg_rate, avg_irate, tid_tag, blk_info
1, 1, 1, 1, 6, 7
};
SAggFunctionInfo aAggs[] = {{
......@@ -5133,4 +5269,18 @@ SAggFunctionInfo aAggs[] = {{
noop1,
noop1,
dataBlockRequired,
} };
},
{
// 35
"_block_dist", // return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_BLKINFO,
TSDB_FUNC_BLKINFO,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
function_setup,
blockInfo_func,
noop2,
no_next_step,
blockinfo_func_finalizer,
block_func_merge,
dataBlockRequired,
}};
......@@ -26,11 +26,7 @@
#include "queryLog.h"
#include "tlosertree.h"
#include "ttype.h"
/**
* check if the primary column is load by default, otherwise, the program will
* forced to load primary column explicitly.
*/
#include "tscompression.h"
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
......@@ -177,6 +173,7 @@ static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
......@@ -1050,12 +1047,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
setArithParams((SArithmeticSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || pCol->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
// in case of the block distribution query, the inputBytes is not a constant value.
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type && pCtx[i].inputBytes == p->info.bytes);
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);// && pCtx[i].inputBytes == p->info.bytes);
uint32_t status = aAggs[pCtx[i].functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
......@@ -3736,6 +3734,7 @@ static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
}
}
#if 0
static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
int32_t len = (int32_t)taosArrayGetSize(pArray);
if (len <= 0) {
......@@ -3746,42 +3745,17 @@ static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows;
}
static int compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SDataBlockInfo *)pLeft)->rows;
int32_t right = ((SDataBlockInfo *)pRight)->rows;
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
}
static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist == NULL) {
return;
}
#if 0
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->numOfRows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
}
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
taosArraySort(blockInfos, compareBlockInfo);
#endif
// int sz = sprintf(pTableBlockDist->result,
// "summary: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]",
// getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50),
// getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99),
// min, max, avg,
// totalRows, totalBlocks,
// pTableBlockDist->firstSeekTimeUs,
// pTableBlockDist->numOfRowsInMemTable);
}
//void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// SQuery *pQuery = pRuntimeEnv->pQuery;
//
......@@ -4145,6 +4119,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
// TODO refactor.
pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (pQuery->queryBlockDist) {
pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
} else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery));
} else if (needReverseScan(pQuery)) {
......@@ -4408,22 +4384,40 @@ static SSDataBlock* doSeqTableBlocksScan(void* param) {
static SSDataBlock* doBlockInfoScan(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STableScanInfo *pTableScanInfo = pOperator->info;
STableBlockDist tableBlockDist = {0};
tableBlockDist.numOfTables = pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo));
tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, tableBlockDist.dataBlockInfos);
tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist);
tableBlockDist.numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle);
SSDataBlock* pBlock = &pTableScanInfo->block;
pBlock->info.rows = 1;
pBlock->info.numOfCols = 1;
SBufferWriter bw = tbufInitWriter(NULL, false);
blockDistInfoToBinary(&tableBlockDist, &bw);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
int32_t len = (int32_t) tbufTell(&bw);
pColInfo->pData = malloc(len + sizeof(int32_t));
*(int32_t*) pColInfo->pData = len;
memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
tbufCloseWriter(&bw);
SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv, 0);
pOperator->pRuntimeEnv->pQuery->current = taosArrayGetP(g, 0);
pOperator->status = OP_EXEC_DONE;
return pBlock;
}
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
......@@ -4471,6 +4465,32 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
return pOperator;
}
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
SColumnInfoData infoData = {{0}};
infoData.info.type = TSDB_DATA_TYPE_BINARY;
infoData.info.bytes = 1024;
infoData.info.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
taosArrayPush(pInfo->block.pDataBlock, &infoData);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockInfoScanOperator";
pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doBlockInfoScan;
return pOperator;
}
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
assert(pTableScanInfo != NULL && pDownstream != NULL);
......@@ -4634,10 +4654,6 @@ static SSDataBlock* doSTableAggregate(void* param) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->resultRowInfo);
if (isTsCompQuery(pQuery)) {
finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
}
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo,
pInfo->rowCellInfoOffset);
......@@ -5415,7 +5431,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) {
SSchema blockDistSchema = tGetBlockDistColumnSchema();
// int64_t startTime = taosGetTimestampUs();
tsdbGetFileBlocksDistInfo(pQueryHandle, pTableBlockDist->dataBlockInfos);
// tsdbGetFileBlocksDistInfo(pQueryHandle, pTableBlockDist->dataBlockInfos, );
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
// generateBlockDistResult(pTableBlockDist);
......@@ -5524,7 +5540,8 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
if ((pFuncMsg->functionId == TSDB_FUNC_TAGPRJ) ||
(pFuncMsg->functionId == TSDB_FUNC_TID_TAG && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
(pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) {
(pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
(pFuncMsg->functionId == TSDB_FUNC_BLKINFO)) {
continue;
}
......@@ -6166,6 +6183,9 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert(f < pQuery->numOfCols);
} else if (pColIndex->colId <= TSDB_UD_COLUMN_INDEX) {
// do nothing for user-defined constant value result columns
} else if (pColIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
pColIndex->colIndex = 0;// only one source column, so it must be 0;
assert(pQuery->numOfOutput == 1);
} else {
int32_t f = 0;
for (f = 0; f < pQuery->numOfTags; ++f) {
......
......@@ -158,19 +158,22 @@ tSQLExpr *tSqlExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
* function name is denoted by pFunctionToken
*/
tSQLExpr *tSqlExprCreateFunction(tSQLExprList *pList, SStrToken *pFuncToken, SStrToken *endToken, int32_t optType) {
if (pFuncToken == NULL) return NULL;
if (pFuncToken == NULL) {
return NULL;
}
assert(optType == TK_ID);
tSQLExpr *pExpr = calloc(1, sizeof(tSQLExpr));
pExpr->nSQLOptr = optType;
pExpr->nSQLOptr = TK_FUNCTION;
pExpr->pParam = pList;
int32_t len = (int32_t)((endToken->z + endToken->n) - pFuncToken->z);
pExpr->operand.z = pFuncToken->z;
pExpr->operand.n = len; // raw field name
pExpr->operand.type = pFuncToken->type;
pExpr->token = pExpr->operand;
pExpr->token = (*pFuncToken);
return pExpr;
}
......
......@@ -200,25 +200,6 @@ static SKeyword keywordTable[] = {
{"TRIGGER", TK_TRIGGER},
{"VIEW", TK_VIEW},
{"ALL", TK_ALL},
{"COUNT", TK_COUNT},
{"SUM", TK_SUM},
{"AVG", TK_AVG},
{"MIN", TK_MIN},
{"MAX", TK_MAX},
{"FIRST", TK_FIRST},
{"LAST", TK_LAST},
{"TOP", TK_TOP},
{"BOTTOM", TK_BOTTOM},
{"STDDEV", TK_STDDEV},
{"PERCENTILE", TK_PERCENTILE},
{"APERCENTILE", TK_APERCENTILE},
{"LEASTSQUARES", TK_LEASTSQUARES},
{"HISTOGRAM", TK_HISTOGRAM},
{"DIFF", TK_DIFF},
{"SPREAD", TK_SPREAD},
{"TWA", TK_TWA},
{"INTERP", TK_INTERP},
{"LAST_ROW", TK_LAST_ROW},
{"SEMI", TK_SEMI},
{"NONE", TK_NONE},
{"PREV", TK_PREV},
......@@ -228,17 +209,10 @@ static SKeyword keywordTable[] = {
{"TBNAME", TK_TBNAME},
{"JOIN", TK_JOIN},
{"METRICS", TK_METRICS},
{"TBID", TK_TBID},
{"STABLE", TK_STABLE},
{"FILE", TK_FILE},
{"VNODES", TK_VNODES},
{"UNION", TK_UNION},
{"RATE", TK_RATE},
{"IRATE", TK_IRATE},
{"SUM_RATE", TK_SUM_RATE},
{"SUM_IRATE", TK_SUM_IRATE},
{"AVG_RATE", TK_AVG_RATE},
{"AVG_IRATE", TK_AVG_IRATE},
{"CACHELAST", TK_CACHELAST},
{"DISTINCT", TK_DISTINCT},
};
......
......@@ -22,6 +22,7 @@
#include "tbuffer.h"
#include "tlosertree.h"
#include "queryLog.h"
#include "tscompression.h"
typedef struct SCompSupporter {
STableQueryInfo **pTableQueryInfo;
......@@ -243,7 +244,6 @@ void* destroyResultRowPool(SResultRowPool* p) {
return NULL;
}
// TODO refactor
void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) {
uint32_t numOfGroup = (uint32_t) taosArrayGetSize(pRes);
tbufWriteUint32(bw, numOfGroup);
......@@ -581,25 +581,70 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
return TSDB_CODE_SUCCESS;
}
/*
* typedef struct {
int64_t numOfTables;
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
} STableBlockDist;
*
*/
SBufferWriter* blockDistInfoToBinary(STableBlockDist* pDist) {
SBufferWriter bw = tbufInitWriter(NULL, false);
void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
tbufWriteUint32(bw, pDist->numOfTables);
tbufWriteUint16(bw, pDist->numOfFiles);
tbufWriteUint64(bw, pDist->totalSize);
tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
// compress the binary string
char* p = TARRAY_GET_START(pDist->dataBlockInfos);
// compress extra bytes
size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize;
char* tmp = malloc(x + 2);
tbufWriteUint64(&bw, pDist->numOfTables);
tbufWriteUint64(&bw, pDist->numOfRowsInMemTable);
tbufWriteUint64(&bw, taosArrayGetSize(pDist->dataBlockInfos));
bool comp = false;
int32_t len = tsCompressString(p, x, 1, tmp, x, ONE_STAGE_COMP, NULL, 0);
if (len == -1 || len >= x) { // compress failed, do not compress this binary data
comp = false;
len = x;
} else {
comp = true;
}
pDist->dataBlockInfos->pData
tbufWriteUint8(bw, comp);
tbufWriteUint32(bw, len);
if (comp) {
tbufWriteBinary(bw, tmp, len);
} else {
tbufWriteBinary(bw, p, len);
}
tfree(tmp);
}
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
SBufferReader br = tbufInitReader(data, len, false);
pDist->numOfTables = tbufReadUint32(&br);
pDist->numOfFiles = tbufReadUint16(&br);
pDist->totalSize = tbufReadUint64(&br);
pDist->numOfRowsInMemTable = tbufReadUint32(&br);
int32_t numOfBlocks = tbufReadUint64(&br);
bool comp = tbufReadUint8(&br);
uint32_t compLen = tbufReadUint32(&br);
size_t originalLen = numOfBlocks*sizeof(SFileBlockInfo);
char* outputBuf = NULL;
if (comp) {
outputBuf = malloc(originalLen);
size_t actualLen = compLen;
const char* compStr = tbufReadBinary(&br, &actualLen);
int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
originalLen , ONE_STAGE_COMP, NULL, 0);
assert(orignalLen == numOfBlocks*sizeof(SFileBlockInfo));
} else {
outputBuf = (char*) tbufReadBinary(&br, &originalLen);
}
pDist->dataBlockInfos = taosArrayFromList(outputBuf, numOfBlocks, sizeof(SFileBlockInfo));
if (comp) {
tfree(outputBuf);
}
}
......@@ -238,8 +238,8 @@ bool qTableQuery(qinfo_t qinfo) {
buildTagQueryResult(pQInfo);
} else if (pQInfo->query.stableQuery) {
stableQueryImpl(pQInfo);
} else if (pQInfo->query.queryBlockDist){
buildTableBlockDistResult(pQInfo);
// } else if (pQInfo->query.queryBlockDist){
// buildTableBlockDistResult(pQInfo);
} else {
tableQueryImpl(pQInfo);
}
......
此差异已折叠。
......@@ -353,7 +353,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
}
tsdbMayTakeMemSnapshot(pQueryHandle);
assert(pCond != NULL && pCond->numOfCols > 0 && pMemRef != NULL);
// In case of data block info retrieval, the pCond->numOfCols is 0.
assert(pCond != NULL && pCond->numOfCols >= 0 && pMemRef != NULL);
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
......@@ -384,7 +386,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
pQueryHandle->statis[i].colId = colInfo.info.colId;
}
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
if (pCond->numOfCols > 0) {
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
......@@ -2006,9 +2010,10 @@ static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
cur->blockCompleted = false;
}
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockInfo) {
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
pTableBlockInfo->totalSize = 0;
STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb);
// find the start data block in file
......@@ -2021,8 +2026,9 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockI
tsdbFSIterSeek(&pQueryHandle->fileIter, fid);
tsdbUnLockFS(pFileHandle);
int32_t code = TSDB_CODE_SUCCESS;
pTableBlockInfo->numOfFiles += 1;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pQueryHandle->pTableCheckInfo);
STimeWindow win = TSWINDOW_INITIALIZER;
......@@ -2048,6 +2054,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockI
break;
}
pTableBlockInfo->numOfFiles += 1;
if (tsdbSetAndOpenReadFSet(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
code = terrno;
......@@ -2072,16 +2079,15 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockI
continue;
}
SFileBlockInfo info = {0};
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
info.numOfRows = pBlock[j].numOfRows;
info.len = pBlock[j].len;
pTableBlockInfo->totalSize += pBlock[j].len;
taosArrayPush(pBlockInfo, &info);
int32_t numOfRows = pBlock[j].numOfRows;
taosArrayPush(pTableBlockInfo->dataBlockInfos, &numOfRows);
}
}
}
......
......@@ -25,7 +25,8 @@ extern "C" {
#define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
#define TARRAY_GET_START(array) ((array)->pData)
typedef struct SArray {
size_t size;
......
......@@ -73,14 +73,14 @@ int main( int argc, char** argv ) {
}
*/
typedef struct {
typedef struct SBufferReader {
bool endian;
const char* data;
size_t pos;
size_t size;
} SBufferReader;
typedef struct {
typedef struct SBufferWriter {
bool endian;
char* data;
size_t pos;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册