未验证 提交 7d8ca438 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #5401 from taosdata/feature/qrefactor

Feature/qrefactor
......@@ -133,6 +133,7 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
......
......@@ -100,6 +100,10 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
} else if (functionId == TSDB_FUNC_BLKINFO) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
pCtx->numOfParams = 1;
}
pCtx->interBufBytes = pExpr->interBytes;
......@@ -951,10 +955,10 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
// todo extract function
int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
void** pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalMerge->resColModel->capacity);
pResPages[i] = calloc(1, pField->bytes * pLocalMerge->resColModel->capacity);
}
while (1) {
......@@ -966,7 +970,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
if (pQueryInfo->limit.offset > 0) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset,
memmove(pResPages[i], ((char*)pResPages[i]) + pField->bytes * pQueryInfo->limit.offset,
(size_t)(newRows * pField->bytes));
}
}
......@@ -1010,7 +1014,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows));
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i], (size_t)(pField->bytes * pRes->numOfRows));
offset += pField->bytes;
}
......
此差异已折叠。
......@@ -497,8 +497,6 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code;
}
} else if (pCmd->command >= TSDB_SQL_LOCAL) {
//pSql->epSet = tscMgmtEpSet;
// } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql);
}
......@@ -705,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
tscGetNumOfColumns(pTableMeta));
......@@ -835,13 +833,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
pSqlFuncExpr->colType = htons(pExpr->resType);
pSqlFuncExpr->colBytes = htons(pExpr->resBytes);
} else if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
SSchema *s = tGetTbnameColumnSchema();
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
} else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pSqlFuncExpr->colType = htons(s.type);
pSqlFuncExpr->colBytes = htons(s.bytes);
} else {
SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->colInfo.colId);
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
}
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
......@@ -866,6 +882,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SSqlExpr *pExpr = pField->pSqlExpr;
// this should be switched to projection query
if (pExpr != NULL) {
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
......@@ -879,27 +897,25 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlFuncExpr1->functionId = htons(pExpr->functionId);
pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncMsg);
pSqlFuncExpr1->numOfParams = 0; // no params for projection query
pSqlFuncExpr1->functionId = htons(TSDB_FUNC_PRJ);
pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId);
pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
bool assign = false;
for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) {
SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f);
if (pe == pExpr) {
pSqlFuncExpr1->colInfo.colIndex = htons(f);
pSqlFuncExpr1->colType = htons(pe->resType);
pSqlFuncExpr1->colBytes = htons(pe->resBytes);
assign = true;
break;
}
}
assert(assign);
pMsg += sizeof(SSqlFuncMsg);
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
} else {
assert(pField->pArithExprInfo != NULL);
......
......@@ -503,9 +503,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
TSKEY s = INT64_MAX;
for(int32_t i = 0; i < size; ++i) {
TSKEY k = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, i))->key;
if (s > k) {
s = k;
}
}
pQueryInfo->window.skey = s;
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
}
if (pSub->pTimer == NULL) {
......
......@@ -74,14 +74,14 @@ static bool allSubqueryDone(SSqlObj *pParentSql) {
SSubqueryState *subState = &pParentSql->subState;
//lock in caller
tscDebug("%p total subqueries: %d", pParentSql, subState->numOfSub);
for (int i = 0; i < subState->numOfSub; i++) {
if (0 == subState->states[i]) {
tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
tscDebug("%p subquery:%p, index: %d NOT finished, abort query completion check", pParentSql, pParentSql->pSubs[i], i);
done = false;
break;
} else {
tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
tscDebug("%p subquery:%p, index: %d finished", pParentSql, pParentSql->pSubs[i], i);
}
}
......@@ -453,7 +453,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj
assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns
taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
......@@ -507,6 +507,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t funcId = pExpr->functionId;
// add the invisible timestamp column
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
......@@ -847,6 +848,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -2643,6 +2646,11 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
// clear the limit/offset info, since it should not be sent to vnode to be executed.
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
......@@ -3102,30 +3110,6 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
}
}
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
if ( length >= 0 ) {
pRes->tsrow[columnIndex] = (unsigned char*)pRes->buffer[columnIndex];
pRes->length[columnIndex] = length;
} else {
tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
pRes->tsrow[columnIndex] = NULL;
pRes->length[columnIndex] = 0;
}
}
}
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
......
......@@ -97,6 +97,22 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) {
return true;
}
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->functionId;
// "select count(tbname)" query
if (functId == TSDB_FUNC_BLKINFO) {
return true;
}
}
return false;
}
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) {
return false;
......@@ -1725,7 +1741,12 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId= -1000;
pQueryInfo->resColumnId = -1000;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
}
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
......
......@@ -33,7 +33,7 @@ typedef struct SDataStatis {
typedef struct SColumnInfoData {
SColumnInfo info;
void* pData; // the corresponding block data in memory
char* pData; // the corresponding block data in memory
} SColumnInfoData;
typedef struct SResPair {
......
......@@ -252,7 +252,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query")
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query")
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistance in replica")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
// grant
......
......@@ -394,7 +394,7 @@ typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN];
char name[TSDB_COL_NAME_LEN]; // TODO remove it
} SColIndex;
/* sql function msg, to describe the message to vnode about sql function
......@@ -402,7 +402,10 @@ typedef struct SColIndex {
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
......
......@@ -158,13 +158,18 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
typedef void *TsdbQueryHandleT; // Use void to hide implementation details
// query condition to build vnode iterator
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
typedef struct SMemRef {
......@@ -181,17 +186,31 @@ typedef struct SDataBlockInfo {
int32_t tid;
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numOfRows;
} SFileBlockInfo;
typedef struct {
void *pTable;
TSKEY lastKey;
} STableKeyInfo;
typedef struct {
size_t numOfTables;
uint32_t numOfTables;
SArray * pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
SArray *dataBlockInfos;
} STableBlockDist;
/**
* Get the data block iterator, starting from position according to the query condition
*
......@@ -252,16 +271,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
/**
* move to next block if exists but not merge data in memtable
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle);
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type);
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);
/**
* Get current data block information
......@@ -306,7 +316,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const
SColIndex *pColIndex, int32_t numOfCols);
/**
* destory the created table group list, which is generated by tag query
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
......@@ -336,6 +346,12 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STabl
*/
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
......
......@@ -191,52 +191,18 @@
#define TK_STATEMENT 172
#define TK_TRIGGER 173
#define TK_VIEW 174
#define TK_COUNT 175
#define TK_SUM 176
#define TK_AVG 177
#define TK_MIN 178
#define TK_MAX 179
#define TK_FIRST 180
#define TK_LAST 181
#define TK_TOP 182
#define TK_BOTTOM 183
#define TK_STDDEV 184
#define TK_PERCENTILE 185
#define TK_APERCENTILE 186
#define TK_LEASTSQUARES 187
#define TK_HISTOGRAM 188
#define TK_DIFF 189
#define TK_SPREAD 190
#define TK_TWA 191
#define TK_INTERP 192
#define TK_LAST_ROW 193
#define TK_RATE 194
#define TK_IRATE 195
#define TK_SUM_RATE 196
#define TK_SUM_IRATE 197
#define TK_AVG_RATE 198
#define TK_AVG_IRATE 199
#define TK_TBID 200
#define TK_SEMI 201
#define TK_NONE 202
#define TK_PREV 203
#define TK_LINEAR 204
#define TK_IMPORT 205
#define TK_METRIC 206
#define TK_TBNAME 207
#define TK_JOIN 208
#define TK_METRICS 209
#define TK_INSERT 210
#define TK_INTO 211
#define TK_VALUES 212
#define TK_SEMI 175
#define TK_NONE 176
#define TK_PREV 177
#define TK_LINEAR 178
#define TK_IMPORT 179
#define TK_METRIC 180
#define TK_TBNAME 181
#define TK_JOIN 182
#define TK_METRICS 183
#define TK_INSERT 184
#define TK_INTO 185
#define TK_VALUES 186
#define TK_SPACE 300
......
......@@ -174,7 +174,7 @@ bool isValidDataType(int32_t type);
void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void* getNullValue(int32_t type);
void *getNullValue(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
#include "trpc.h"
#include "tvariant.h"
#include "tsdb.h"
#define TSDB_FUNC_INVALID_ID -1
#define TSDB_FUNC_COUNT 0
......@@ -70,15 +71,17 @@ extern "C" {
#define TSDB_FUNC_AVG_IRATE 34
#define TSDB_FUNC_TID_TAG 35
#define TSDB_FUNC_HISTOGRAM 36
#define TSDB_FUNC_HLL 37
#define TSDB_FUNC_MODE 38
#define TSDB_FUNC_SAMPLE 39
#define TSDB_FUNC_CEIL 40
#define TSDB_FUNC_FLOOR 41
#define TSDB_FUNC_ROUND 42
#define TSDB_FUNC_MAVG 43
#define TSDB_FUNC_CSUM 44
#define TSDB_FUNC_BLKINFO 36
#define TSDB_FUNC_HISTOGRAM 37
#define TSDB_FUNC_HLL 38
#define TSDB_FUNC_MODE 39
#define TSDB_FUNC_SAMPLE 40
#define TSDB_FUNC_CEIL 41
#define TSDB_FUNC_FLOOR 42
#define TSDB_FUNC_ROUND 43
#define TSDB_FUNC_MAVG 44
#define TSDB_FUNC_CSUM 45
#define TSDB_FUNCSTATE_SO 0x1u // single output
......@@ -214,13 +217,14 @@ typedef struct SAggFunctionInfo {
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable);
int32_t isValidFunction(const char* name, int32_t len);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
......@@ -242,12 +246,16 @@ typedef struct STwaInfo {
STimeWindow win;
} STwaInfo;
struct SBufferWriter;
void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw);
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist);
/* global sql function array */
extern struct SAggFunctionInfo aAggs[];
extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval);
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval);
/**
* the numOfRes should be kept, since it may be used later
......@@ -258,14 +266,14 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
(_r)->initialized = false; \
} while (0)
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) {
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, int32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, (size_t)bufLen);
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
}
#ifdef __cplusplus
......
......@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H
#ifndef TDENGINE_QEXECUTOR_H
#define TDENGINE_QEXECUTOR_H
#include "os.h"
......@@ -37,30 +37,24 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)
enum {
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u,
/* result output buffer is full, current query is paused.
* this status is only exist in group-by clause and diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL = 0x2u,
/* query is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
QUERY_COMPLETED = 0x4u,
QUERY_COMPLETED = 0x2u,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
QUERY_OVER = 0x8u,
QUERY_OVER = 0x4u,
};
typedef struct SResultRowPool {
......@@ -86,13 +80,13 @@ typedef struct SSqlGroupbyExpr {
typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:29; // row index in buffer page
int32_t offset:29; // row index in buffer page
bool startInterp; // the time window start timestamp has done the interpolation already.
bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window
union {STimeWindow win; char* key;}; // start key of current result row
} SResultRow;
typedef struct SGroupResInfo {
......@@ -106,12 +100,11 @@ typedef struct SGroupResInfo {
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef struct SResultRec {
typedef struct SRspResultInfo {
int64_t total; // total generated result size in rows
int64_t rows; // current result set size in rows
int64_t capacity; // capacity of current result output buffer
int32_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows.
} SResultRec;
} SRspResultInfo;
typedef struct SResultRowInfo {
SResultRow** pResult; // result list
......@@ -138,7 +131,6 @@ typedef struct SSingleColumnFilterInfo {
typedef struct STableQueryInfo {
TSKEY lastKey;
int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
tVariant tag;
STimeWindow win;
STSCursor cur;
......@@ -179,72 +171,77 @@ typedef struct {
SArray* pResult; // SArray<SStddevInterResult>
} SInterResult;
typedef struct SSDataBlock {
SDataStatis *pBlockStatis;
SArray *pDataBlock;
SDataBlockInfo info;
} SSDataBlock;
typedef struct SQuery {
SLimitVal limit;
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
SOrderVal order;
int16_t numOfCols;
int16_t numOfTags;
SOrderVal order;
STimeWindow window;
SInterval interval;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
int16_t checkResultBuf; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t srcRowSize; // todo extract struct
int32_t resultRowSize;
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxSrcColumnSize;
int32_t tagLen; // tag value length of current query
SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1;
SExprInfo* pExpr2;
int32_t numOfExpr2;
SColumnInfo* colList;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
uint32_t status; // query status
SResultRec rec;
int32_t pos;
tFilePage** sdata;
STableQueryInfo* current;
int32_t numOfCheckedBlocks; // number of check data blocks
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo;
STableQueryInfo* current;
void* tsdb;
SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
} SQuery;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
typedef struct SQueryRuntimeEnv {
jmp_buf env;
SQuery* pQuery;
SQLFunctionCtx* pCtx;
int32_t numOfRowsPerPage;
uint16_t* offset;
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
SResultRowInfo resultRowInfo;
SQueryCostInfo summary;
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
......@@ -253,8 +250,56 @@ typedef struct SQueryRuntimeEnv {
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray;
SSDataBlock *outputBuf;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo *proot;
struct SOperatorInfo *pTableScanner; // table scan operator
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap;
} SQueryRuntimeEnv;
enum {
OP_IN_EXECUTING = 1,
OP_RES_TO_RETURN = 2,
OP_EXEC_DONE = 3,
};
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Arithmetic = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_Offset = 10,
OP_TimeInterval = 11,
OP_Fill = 12,
OP_MultiTableAggregate = 13,
OP_MultiTableTimeInterval = 14,
};
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results
char *name; // name, used to show the query execution plan
void *info; // extension attribution
SExprInfo *pExpr;
SQueryRuntimeEnv *pRuntimeEnv;
struct SOperatorInfo *upstream;
__operator_fn_t exec;
__optr_cleanup_fn_t cleanup;
} SOperatorInfo;
enum {
QUERY_RESULT_NOT_READY = 1,
QUERY_RESULT_READY = 2,
......@@ -264,21 +309,9 @@ typedef struct SQInfo {
void* signature;
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
void* tsdb;
SMemRef memRef;
int32_t vgId;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv;
SHashObj* arrTableIdInfo;
int32_t groupIndex;
/*
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
*/
int32_t tableIndex;
SGroupResInfo groupResInfo;
SQueryRuntimeEnv runtimeEnv;
SQuery query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
......@@ -287,6 +320,7 @@ typedef struct SQInfo {
void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string
SQueryCostInfo summary;
} SQInfo;
typedef struct SQueryParam {
......@@ -305,10 +339,85 @@ typedef struct SQueryParam {
SSqlGroupbyExpr *pGroupbyExpr;
} SQueryParam;
typedef struct STableScanInfo {
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
int32_t order; // scan order
int32_t times; // repeat counts
int32_t current;
int32_t reverseTimes; // 0 by default
SQLFunctionCtx *pCtx; // next operator query context
SResultRowInfo *pResultRowInfo;
int32_t *rowCellInfoOffset;
SExprInfo *pExpr;
SSDataBlock block;
bool loadExternalRows; // load external rows (prev & next rows)
int32_t numOfOutput;
int64_t elapsedTime;
int32_t tableIndex;
} STableScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
int32_t totalTables;
int32_t currentIndex;
} STagScanInfo;
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
int32_t *rowCellInfoOffset; // offset value for each row result cell info
SQLFunctionCtx *pCtx;
SSDataBlock *pRes;
} SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
uint32_t seed;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
SOptrBasicInfo binfo;
int32_t bufCapacity;
uint32_t seed;
} SArithOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
int64_t total;
} SLimitOperatorInfo;
typedef struct SOffsetOperatorInfo {
int64_t offset;
} SOffsetOperatorInfo;
typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo;
SSDataBlock *pRes;
int64_t totalInputRows;
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
int32_t colIndex;
char *prevData; // previous group by value
} SGroupbyOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
SColumnInfo* pTagCols);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
......@@ -318,13 +427,9 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQuery *pQuery, int8_t status);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(SQuery* pQuery);
void buildTagQueryResult(SQInfo *pQInfo);
void stableQueryImpl(SQInfo *pQInfo);
void buildTableBlockDistResult(SQInfo *pQInfo);
void tableQueryImpl(SQInfo *pQInfo);
bool isValidQInfo(void *param);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
......@@ -336,4 +441,4 @@ void freeQInfo(SQInfo *pQInfo);
int32_t getMaximumIdleDurationSec();
#endif // TDENGINE_QUERYEXECUTOR_H
#endif // TDENGINE_QEXECUTOR_H
......@@ -24,6 +24,8 @@ extern "C" {
#include "qExtbuffer.h"
#include "taosdef.h"
struct SSDataBlock;
typedef struct {
STColumn col; // column info
int16_t functionId; // sql function id
......@@ -78,7 +80,7 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput);
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput);
......@@ -88,7 +90,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity);
#ifdef __cplusplus
}
......
......@@ -55,7 +55,6 @@ typedef struct SResultBufStatis {
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
......@@ -77,7 +76,7 @@ typedef struct SDiskbasedResultBuf {
SResultBufStatis statis;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
/**
......@@ -89,8 +88,7 @@ typedef struct SDiskbasedResultBuf {
* @param handle
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle);
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle);
/**
*
......@@ -101,13 +99,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
*/
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @return
*/
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf);
/**
*
* @param pResultBuf
......
......@@ -185,19 +185,32 @@ typedef struct SSqlInfo {
};
} SSqlInfo;
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
enum SQL_NODE_TYPE {
SQL_NODE_TABLE_COLUMN= 1,
SQL_NODE_SQLFUNCTION = 2,
SQL_NODE_VALUE = 3,
SQL_NODE_EXPR = 4,
};
typedef struct tSQLExpr {
uint32_t nSQLOptr; // TK_FUNCTION: sql function, TK_LE: less than(binary expr)
uint16_t type; // sql node type
uint32_t tokenId; // TK_FUNCTION: sql function, TK_LE: less than(binary expr)
// the full sql string of function(col, param), which is actually the raw
// field name, since the function name is kept in nSQLOptr already
// the whole string of the function(col, param), while the function name is kept in token
SStrToken operand;
SStrToken colInfo; // field id
tVariant val; // value only for string, float, int
uint32_t functionId; // function id
SStrToken colInfo; // table column info
tVariant value; // the use input value
SStrToken token; // original sql expr string
struct tSQLExpr *pLeft; // left child
struct tSQLExpr *pRight; // right child
struct tSQLExprList *pParam; // function parameters
struct tSQLExprList *pParam; // function parameters list
} tSQLExpr;
// used in select clause. select <tSQLExprList> from xxx
......@@ -294,16 +307,6 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type);
void *ParseAlloc(void *(*mallocProc)(size_t));
enum {
TSQL_NODE_TYPE_EXPR = 0x1,
TSQL_NODE_TYPE_ID = 0x2,
TSQL_NODE_TYPE_VALUE = 0x4,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
SSqlInfo qSQLParse(const char *str);
#ifdef __cplusplus
......
......@@ -113,12 +113,10 @@ STSBuf* tsBufClone(STSBuf* pTSBuf);
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
......
......@@ -27,7 +27,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
int32_t getOutputInterResultBufSize(SQuery* pQuery);
......@@ -44,22 +44,18 @@ void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot];
}
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult,
tFilePage* page) {
assert(pResult != NULL && pRuntimeEnv != NULL);
static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) {
assert(rowOffset >= 0 && pQuery != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pExpr1[columnIndex].bytes * realRowId;
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
......@@ -74,8 +70,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
bool isPointInterpoQuery(SQuery *pQuery);
typedef struct {
SArray* pResult; // SArray<SResPair>
int32_t colId;
......@@ -85,12 +79,14 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray* interResFromBinary(const char* data, int32_t len);
void freeInterResult(void* param);
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset);
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
#endif // TDENGINE_QUERYUTIL_H
此差异已折叠。
此差异已折叠。
......@@ -23,18 +23,19 @@
#include "qFill.h"
#include "qExtbuffer.h"
#include "queryLog.h"
#include "qExecutor.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows) {
static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
continue;
}
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, genRows);
char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows);
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
......@@ -44,17 +45,17 @@ static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows
}
}
static void setNullValueForRow(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfCol, int32_t rowIndex) {
static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column.
for (int32_t i = 1; i < numOfCol; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* output = elePtrAt(data[i]->data, pCol->col.bytes, rowIndex);
char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex);
setNull(output, pCol->col.type, pCol->col.bytes);
}
}
static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** srcData, int64_t ts, bool outOfBound) {
static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) {
char* prev = pFillInfo->prevValues;
char* next = pFillInfo->nextValues;
......@@ -63,7 +64,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, index);
char* val = elePtrAt(data[0], TSDB_KEYSIZE, index);
*(TSKEY*) val = pFillInfo->currentKey;
// set the other values
......@@ -77,7 +78,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
continue;
}
char* output = elePtrAt(data[i]->data, pCol->col.bytes, index);
char* output = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
......@@ -93,7 +94,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
continue;
}
char* output = elePtrAt(data[i]->data, pCol->col.bytes, index);
char* output = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
......@@ -111,7 +112,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
int16_t type = pCol->col.type;
int16_t bytes = pCol->col.bytes;
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, index);
char *val1 = elePtrAt(data[i], pCol->col.bytes, index);
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, pCol->col.type, bytes);
continue;
......@@ -132,7 +133,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
continue;
}
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, index);
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
}
}
......@@ -162,7 +163,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu
}
}
static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t outputRows) {
static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) {
pFillInfo->numOfCurrent = 0;
char** srcData = pFillInfo->pData;
......@@ -213,7 +214,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
continue;
}
char* output = elePtrAt(data[i]->data, pCol->col.bytes, pFillInfo->numOfCurrent);
char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index);
if (i == 0 || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) ||
......@@ -255,7 +256,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
return pFillInfo->numOfCurrent;
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, tFilePage** output, int64_t resultCapacity) {
static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) {
/*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
......@@ -278,7 +279,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity);
pFillInfo->pData[i] = NULL;
if (TSDB_COL_IS_TAG(pColInfo->flag)) {
bool exists = false;
......@@ -356,6 +357,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
pFillInfo->pData[i] = malloc(pFillInfo->pFillCol[i].col.bytes * pFillInfo->alloc);
}
return pFillInfo;
}
......@@ -375,12 +380,17 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
tfree(pFillInfo->prevValues);
tfree(pFillInfo->nextValues);
tfree(pFillInfo->pTags);
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
tfree(pFillInfo->pTags[i].tagVal);
}
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
tfree(pFillInfo->pData[i]);
}
tfree(pFillInfo->pTags);
tfree(pFillInfo->pData);
tfree(pFillInfo->pFillCol);
......@@ -413,10 +423,19 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
}
}
// copy the data into source data buffer
void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput) {
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
// pFillInfo->pData[i] = pColData->pData;
if (pInput->info.rows > pFillInfo->alloc) {
char* t = realloc(pFillInfo->pData[i], pColData->info.bytes * pInput->info.rows);
assert(t != NULL);
pFillInfo->pData[i] = t;
pFillInfo->alloc = pInput->info.rows;
}
memcpy(pFillInfo->pData[i], pColData->pData, pColData->info.bytes * pInput->info.rows);
}
}
......@@ -427,12 +446,20 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage*
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
const char* data = pInput->data + pCol->col.offset * pInput->num;
memcpy(pFillInfo->pData[i], data, (size_t)(pInput->num * pCol->col.bytes));
if (pInput->num > pFillInfo->alloc) {
char* t = realloc(pFillInfo->pData[i], (size_t)(pCol->col.bytes * pInput->num));
assert(t != NULL);
pFillInfo->pData[i] = t;
pFillInfo->alloc = (int32_t)pInput->num;
}
memcpy(pFillInfo->pData[i], data, (size_t)(pCol->col.bytes * pInput->num));
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.colId);
memcpy(pTag->tagVal, data, pCol->col.bytes);
memcpy(pTag->tagVal, data, pCol->col.bytes); // TODO not memcpy??
}
}
}
......@@ -490,7 +517,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint*
return TSDB_CODE_SUCCESS;
}
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo);
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
......
......@@ -135,28 +135,33 @@ tSQLExpr *tSqlExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
if (optrType == TK_INTEGER || optrType == TK_STRING || optrType == TK_FLOAT || optrType == TK_BOOL) {
toTSDBType(pToken->type);
tVariantCreate(&pSqlExpr->val, pToken);
pSqlExpr->nSQLOptr = optrType;
tVariantCreate(&pSqlExpr->value, pToken);
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_VALUE;
} else if (optrType == TK_NOW) {
// use microsecond by default
pSqlExpr->val.i64 = taosGetTimestamp(TSDB_TIME_PRECISION_MICRO);
pSqlExpr->val.nType = TSDB_DATA_TYPE_BIGINT;
pSqlExpr->nSQLOptr = TK_TIMESTAMP; // TK_TIMESTAMP used to denote the time value is in microsecond
pSqlExpr->value.i64 = taosGetTimestamp(TSDB_TIME_PRECISION_MICRO);
pSqlExpr->value.nType = TSDB_DATA_TYPE_BIGINT;
pSqlExpr->tokenId = TK_TIMESTAMP; // TK_TIMESTAMP used to denote the time value is in microsecond
pSqlExpr->type = SQL_NODE_VALUE;
} else if (optrType == TK_VARIABLE) {
int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSqlExpr->val.i64);
int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSqlExpr->value.i64);
if (ret != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
pSqlExpr->val.nType = TSDB_DATA_TYPE_BIGINT;
pSqlExpr->nSQLOptr = TK_TIMESTAMP;
} else { // it must be the column name (tk_id) if it is not the number
pSqlExpr->value.nType = TSDB_DATA_TYPE_BIGINT;
pSqlExpr->tokenId = TK_TIMESTAMP;
pSqlExpr->type = SQL_NODE_VALUE;
} else {
// Here it must be the column name (tk_id) if it is not a number or string.
assert(optrType == TK_ID || optrType == TK_ALL);
if (pToken != NULL) {
pSqlExpr->colInfo = *pToken;
}
pSqlExpr->nSQLOptr = optrType;
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_TABLE_COLUMN;
}
return pSqlExpr;
......@@ -167,19 +172,22 @@ tSQLExpr *tSqlExprIdValueCreate(SStrToken *pToken, int32_t optrType) {
* function name is denoted by pFunctionToken
*/
tSQLExpr *tSqlExprCreateFunction(tSQLExprList *pList, SStrToken *pFuncToken, SStrToken *endToken, int32_t optType) {
if (pFuncToken == NULL) return NULL;
if (pFuncToken == NULL) {
return NULL;
}
tSQLExpr *pExpr = calloc(1, sizeof(tSQLExpr));
pExpr->nSQLOptr = optType;
pExpr->tokenId = optType;
pExpr->type = SQL_NODE_SQLFUNCTION;
pExpr->pParam = pList;
int32_t len = (int32_t)((endToken->z + endToken->n) - pFuncToken->z);
pExpr->operand.z = pFuncToken->z;
pExpr->operand = (*pFuncToken);
pExpr->operand.n = len; // raw field name
pExpr->operand.type = pFuncToken->type;
pExpr->token.n = len;
pExpr->token.z = pFuncToken->z;
pExpr->token.type = pFuncToken->type;
pExpr->token = pExpr->operand;
return pExpr;
}
......@@ -190,6 +198,7 @@ tSQLExpr *tSqlExprCreateFunction(tSQLExprList *pList, SStrToken *pFuncToken, SSt
tSQLExpr *tSqlExprCreate(tSQLExpr *pLeft, tSQLExpr *pRight, int32_t optrType) {
tSQLExpr *pExpr = calloc(1, sizeof(tSQLExpr));
pExpr->type = SQL_NODE_EXPR;
if (pLeft != NULL && pRight != NULL && (optrType != TK_IN)) {
char* endPos = pRight->token.z + pRight->token.n;
pExpr->token.z = pLeft->token.z;
......@@ -203,32 +212,33 @@ tSQLExpr *tSqlExprCreate(tSQLExpr *pLeft, tSQLExpr *pRight, int32_t optrType) {
* if a token is noted as the TK_TIMESTAMP, the time precision is microsecond
* Otherwise, the time precision is adaptive, determined by the time precision from databases.
*/
if ((pLeft->nSQLOptr == TK_INTEGER && pRight->nSQLOptr == TK_INTEGER) ||
(pLeft->nSQLOptr == TK_TIMESTAMP && pRight->nSQLOptr == TK_TIMESTAMP)) {
pExpr->val.nType = TSDB_DATA_TYPE_BIGINT;
pExpr->nSQLOptr = pLeft->nSQLOptr;
if ((pLeft->tokenId == TK_INTEGER && pRight->tokenId == TK_INTEGER) ||
(pLeft->tokenId == TK_TIMESTAMP && pRight->tokenId == TK_TIMESTAMP)) {
pExpr->value.nType = TSDB_DATA_TYPE_BIGINT;
pExpr->tokenId = pLeft->tokenId;
pExpr->type = SQL_NODE_VALUE;
switch (optrType) {
case TK_PLUS: {
pExpr->val.i64 = pLeft->val.i64 + pRight->val.i64;
pExpr->value.i64 = pLeft->value.i64 + pRight->value.i64;
break;
}
case TK_MINUS: {
pExpr->val.i64 = pLeft->val.i64 - pRight->val.i64;
pExpr->value.i64 = pLeft->value.i64 - pRight->value.i64;
break;
}
case TK_STAR: {
pExpr->val.i64 = pLeft->val.i64 * pRight->val.i64;
pExpr->value.i64 = pLeft->value.i64 * pRight->value.i64;
break;
}
case TK_DIVIDE: {
pExpr->nSQLOptr = TK_FLOAT;
pExpr->val.nType = TSDB_DATA_TYPE_DOUBLE;
pExpr->val.dKey = (double)pLeft->val.i64 / pRight->val.i64;
pExpr->tokenId = TK_FLOAT;
pExpr->value.nType = TSDB_DATA_TYPE_DOUBLE;
pExpr->value.dKey = (double)pLeft->value.i64 / pRight->value.i64;
break;
}
case TK_REM: {
pExpr->val.i64 = pLeft->val.i64 % pRight->val.i64;
pExpr->value.i64 = pLeft->value.i64 % pRight->value.i64;
break;
}
}
......@@ -236,33 +246,35 @@ tSQLExpr *tSqlExprCreate(tSQLExpr *pLeft, tSQLExpr *pRight, int32_t optrType) {
tSqlExprDestroy(pLeft);
tSqlExprDestroy(pRight);
} else if ((pLeft->nSQLOptr == TK_FLOAT && pRight->nSQLOptr == TK_INTEGER) || (pLeft->nSQLOptr == TK_INTEGER && pRight->nSQLOptr == TK_FLOAT) ||
(pLeft->nSQLOptr == TK_FLOAT && pRight->nSQLOptr == TK_FLOAT)) {
pExpr->val.nType = TSDB_DATA_TYPE_DOUBLE;
pExpr->nSQLOptr = TK_FLOAT;
} else if ((pLeft->tokenId == TK_FLOAT && pRight->tokenId == TK_INTEGER) ||
(pLeft->tokenId == TK_INTEGER && pRight->tokenId == TK_FLOAT) ||
(pLeft->tokenId == TK_FLOAT && pRight->tokenId == TK_FLOAT)) {
pExpr->value.nType = TSDB_DATA_TYPE_DOUBLE;
pExpr->tokenId = TK_FLOAT;
pExpr->type = SQL_NODE_VALUE;
double left = (pLeft->val.nType == TSDB_DATA_TYPE_DOUBLE) ? pLeft->val.dKey : pLeft->val.i64;
double right = (pRight->val.nType == TSDB_DATA_TYPE_DOUBLE) ? pRight->val.dKey : pRight->val.i64;
double left = (pLeft->value.nType == TSDB_DATA_TYPE_DOUBLE) ? pLeft->value.dKey : pLeft->value.i64;
double right = (pRight->value.nType == TSDB_DATA_TYPE_DOUBLE) ? pRight->value.dKey : pRight->value.i64;
switch (optrType) {
case TK_PLUS: {
pExpr->val.dKey = left + right;
pExpr->value.dKey = left + right;
break;
}
case TK_MINUS: {
pExpr->val.dKey = left - right;
pExpr->value.dKey = left - right;
break;
}
case TK_STAR: {
pExpr->val.dKey = left * right;
pExpr->value.dKey = left * right;
break;
}
case TK_DIVIDE: {
pExpr->val.dKey = left / right;
pExpr->value.dKey = left / right;
break;
}
case TK_REM: {
pExpr->val.dKey = left - ((int64_t)(left / right)) * right;
pExpr->value.dKey = left - ((int64_t)(left / right)) * right;
break;
}
}
......@@ -271,21 +283,21 @@ tSQLExpr *tSqlExprCreate(tSQLExpr *pLeft, tSQLExpr *pRight, int32_t optrType) {
tSqlExprDestroy(pRight);
} else {
pExpr->nSQLOptr = optrType;
pExpr->tokenId = optrType;
pExpr->pLeft = pLeft;
pExpr->pRight = pRight;
}
} else if (optrType == TK_IN) {
pExpr->nSQLOptr = optrType;
pExpr->tokenId = optrType;
pExpr->pLeft = pLeft;
tSQLExpr *pRSub = calloc(1, sizeof(tSQLExpr));
pRSub->nSQLOptr = TK_SET; // TODO refactor .....
pRSub->tokenId = TK_SET; // TODO refactor .....
pRSub->pParam = (tSQLExprList *)pRight;
pExpr->pRight = pRSub;
} else {
pExpr->nSQLOptr = optrType;
pExpr->tokenId = optrType;
pExpr->pLeft = pLeft;
if (pLeft != NULL && pRight == NULL) {
......@@ -325,8 +337,8 @@ void tSqlExprNodeDestroy(tSQLExpr *pExpr) {
return;
}
if (pExpr->nSQLOptr == TK_STRING) {
tVariantDestroy(&pExpr->val);
if (pExpr->tokenId == TK_STRING) {
tVariantDestroy(&pExpr->value);
}
tSqlExprListDestroy(pExpr->pParam);
......
......@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL);
if (ret != TSDB_CODE_SUCCESS) {
tMemBucketDestroy(pBucket);
return NULL;
......
......@@ -9,8 +9,7 @@
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle) {
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf;
......@@ -31,7 +30,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
......@@ -387,8 +385,6 @@ void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) {
pResultBuf->statis.releasePages += 1;
}
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); }
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; }
......
......@@ -200,25 +200,6 @@ static SKeyword keywordTable[] = {
{"TRIGGER", TK_TRIGGER},
{"VIEW", TK_VIEW},
{"ALL", TK_ALL},
{"COUNT", TK_COUNT},
{"SUM", TK_SUM},
{"AVG", TK_AVG},
{"MIN", TK_MIN},
{"MAX", TK_MAX},
{"FIRST", TK_FIRST},
{"LAST", TK_LAST},
{"TOP", TK_TOP},
{"BOTTOM", TK_BOTTOM},
{"STDDEV", TK_STDDEV},
{"PERCENTILE", TK_PERCENTILE},
{"APERCENTILE", TK_APERCENTILE},
{"LEASTSQUARES", TK_LEASTSQUARES},
{"HISTOGRAM", TK_HISTOGRAM},
{"DIFF", TK_DIFF},
{"SPREAD", TK_SPREAD},
{"TWA", TK_TWA},
{"INTERP", TK_INTERP},
{"LAST_ROW", TK_LAST_ROW},
{"SEMI", TK_SEMI},
{"NONE", TK_NONE},
{"PREV", TK_PREV},
......@@ -228,17 +209,10 @@ static SKeyword keywordTable[] = {
{"TBNAME", TK_TBNAME},
{"JOIN", TK_JOIN},
{"METRICS", TK_METRICS},
{"TBID", TK_TBID},
{"STABLE", TK_STABLE},
{"FILE", TK_FILE},
{"VNODES", TK_VNODES},
{"UNION", TK_UNION},
{"RATE", TK_RATE},
{"IRATE", TK_IRATE},
{"SUM_RATE", TK_SUM_RATE},
{"SUM_IRATE", TK_SUM_IRATE},
{"AVG_RATE", TK_AVG_RATE},
{"AVG_IRATE", TK_AVG_IRATE},
{"CACHELAST", TK_CACHELAST},
{"DISTINCT", TK_DISTINCT},
{"PARTITIONS", TK_PARTITIONS},
......@@ -297,7 +271,7 @@ int tSQLKeywordCode(const char* z, int n) {
* Return the length of the token that begins at z[0].
* Store the token type in *type before returning.
*/
uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
uint32_t i;
switch (*z) {
case ' ':
......@@ -307,121 +281,121 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
case '\r': {
for (i = 1; isspace(z[i]); i++) {
}
*tokenType = TK_SPACE;
*tokenId = TK_SPACE;
return i;
}
case ':': {
*tokenType = TK_COLON;
*tokenId = TK_COLON;
return 1;
}
case '-': {
if (z[1] == '-') {
for (i = 2; z[i] && z[i] != '\n'; i++) {
}
*tokenType = TK_COMMENT;
*tokenId = TK_COMMENT;
return i;
}
*tokenType = TK_MINUS;
*tokenId = TK_MINUS;
return 1;
}
case '(': {
*tokenType = TK_LP;
*tokenId = TK_LP;
return 1;
}
case ')': {
*tokenType = TK_RP;
*tokenId = TK_RP;
return 1;
}
case ';': {
*tokenType = TK_SEMI;
*tokenId = TK_SEMI;
return 1;
}
case '+': {
*tokenType = TK_PLUS;
*tokenId = TK_PLUS;
return 1;
}
case '*': {
*tokenType = TK_STAR;
*tokenId = TK_STAR;
return 1;
}
case '/': {
if (z[1] != '*' || z[2] == 0) {
*tokenType = TK_SLASH;
*tokenId = TK_SLASH;
return 1;
}
for (i = 3; z[i] && (z[i] != '/' || z[i - 1] != '*'); i++) {
}
if (z[i]) i++;
*tokenType = TK_COMMENT;
*tokenId = TK_COMMENT;
return i;
}
case '%': {
*tokenType = TK_REM;
*tokenId = TK_REM;
return 1;
}
case '=': {
*tokenType = TK_EQ;
*tokenId = TK_EQ;
return 1 + (z[1] == '=');
}
case '<': {
if (z[1] == '=') {
*tokenType = TK_LE;
*tokenId = TK_LE;
return 2;
} else if (z[1] == '>') {
*tokenType = TK_NE;
*tokenId = TK_NE;
return 2;
} else if (z[1] == '<') {
*tokenType = TK_LSHIFT;
*tokenId = TK_LSHIFT;
return 2;
} else {
*tokenType = TK_LT;
*tokenId = TK_LT;
return 1;
}
}
case '>': {
if (z[1] == '=') {
*tokenType = TK_GE;
*tokenId = TK_GE;
return 2;
} else if (z[1] == '>') {
*tokenType = TK_RSHIFT;
*tokenId = TK_RSHIFT;
return 2;
} else {
*tokenType = TK_GT;
*tokenId = TK_GT;
return 1;
}
}
case '!': {
if (z[1] != '=') {
*tokenType = TK_ILLEGAL;
*tokenId = TK_ILLEGAL;
return 2;
} else {
*tokenType = TK_NE;
*tokenId = TK_NE;
return 2;
}
}
case '|': {
if (z[1] != '|') {
*tokenType = TK_BITOR;
*tokenId = TK_BITOR;
return 1;
} else {
*tokenType = TK_CONCAT;
*tokenId = TK_CONCAT;
return 2;
}
}
case ',': {
*tokenType = TK_COMMA;
*tokenId = TK_COMMA;
return 1;
}
case '&': {
*tokenType = TK_BITAND;
*tokenId = TK_BITAND;
return 1;
}
case '~': {
*tokenType = TK_BITNOT;
*tokenId = TK_BITNOT;
return 1;
}
case '?': {
*tokenType = TK_QUESTION;
*tokenId = TK_QUESTION;
return 1;
}
case '\'':
......@@ -447,7 +421,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
if (z[i]) i++;
if (strEnd) {
*tokenType = TK_STRING;
*tokenId = TK_STRING;
return i;
}
......@@ -471,10 +445,10 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
}
}
*tokenType = TK_FLOAT;
*tokenId = TK_FLOAT;
return i;
} else {
*tokenType = TK_DOT;
*tokenId = TK_DOT;
return 1;
}
}
......@@ -483,7 +457,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
char next = z[1];
if (next == 'b') { // bin number
*tokenType = TK_BIN;
*tokenId = TK_BIN;
for (i = 2; (z[i] == '0' || z[i] == '1'); ++i) {
}
......@@ -493,7 +467,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
return i;
} else if (next == 'x') { //hex number
*tokenType = TK_HEX;
*tokenId = TK_HEX;
for (i = 2; isdigit(z[i]) || (z[i] >= 'a' && z[i] <= 'f') || (z[i] >= 'A' && z[i] <= 'F'); ++i) {
}
......@@ -513,7 +487,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
case '7':
case '8':
case '9': {
*tokenType = TK_INTEGER;
*tokenId = TK_INTEGER;
for (i = 1; isdigit(z[i]); i++) {
}
......@@ -523,7 +497,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
z[i] == 'U' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
z[i] == 'Y' || z[i] == 'W') &&
(isIdChar[(uint8_t)z[i + 1]] == 0)) {
*tokenType = TK_VARIABLE;
*tokenId = TK_VARIABLE;
i += 1;
return i;
}
......@@ -534,12 +508,12 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
while (isdigit(z[i])) {
i++;
}
*tokenType = TK_FLOAT;
*tokenId = TK_FLOAT;
seg++;
}
if (seg == 4) { // ip address
*tokenType = TK_IPTOKEN;
*tokenId = TK_IPTOKEN;
return i;
}
......@@ -549,14 +523,14 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
while (isdigit(z[i])) {
i++;
}
*tokenType = TK_FLOAT;
*tokenId = TK_FLOAT;
}
return i;
}
case '[': {
for (i = 1; z[i] && z[i - 1] != ']'; i++) {
}
*tokenType = TK_ID;
*tokenId = TK_ID;
return i;
}
case 'T':
......@@ -567,7 +541,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
}
if ((i == 4 && strncasecmp(z, "true", 4) == 0) || (i == 5 && strncasecmp(z, "false", 5) == 0)) {
*tokenType = TK_BOOL;
*tokenId = TK_BOOL;
return i;
}
}
......@@ -577,12 +551,12 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
}
for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(uint8_t) z[i]]; i++) {
}
*tokenType = tSQLKeywordCode(z, i);
*tokenId = tSQLKeywordCode(z, i);
return i;
}
}
*tokenType = TK_ILLEGAL;
*tokenId = TK_ILLEGAL;
return 0;
}
......
......@@ -22,6 +22,7 @@
#include "tbuffer.h"
#include "tlosertree.h"
#include "queryLog.h"
#include "tscompression.h"
typedef struct SCompSupporter {
STableQueryInfo **pTableQueryInfo;
......@@ -135,20 +136,22 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
if (pResultRow->pageId >= 0) {
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pResultRow, page);
size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes;
int16_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes;
char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset);
memset(s, 0, size);
offset += size;
RESET_RESULT_INFO(pResultInfo);
}
}
pResultRow->numOfRows = 0;
pResultRow->pageId = -1;
pResultRow->rowId = -1;
pResultRow->offset = -1;
pResultRow->closed = false;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
......@@ -158,13 +161,15 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
}
}
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) {
assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]);
// TODO refactor: use macro
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) {
assert(index >= 0 && offset != NULL);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + offset[index]);
}
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow);
SQuery* pQuery = pRuntimeEnv->pQuery;
return (pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pQuery->interBufSize + sizeof(SResultRow);
}
SResultRowPool* initResultRowPool(size_t size) {
......@@ -340,18 +345,18 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
pGroupResInfo->index = 0;
}
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset) {
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) {
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);
}
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
pGroupResInfo->index = offset;
pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
}
bool hasRemainData(SGroupResInfo* pGroupResInfo) {
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) {
if (pGroupResInfo->pRows == NULL) {
return false;
}
......@@ -359,6 +364,14 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo) {
return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
}
bool hasRemainData(SGroupResInfo* pGroupResInfo) {
if (hasRemainDataInCurrentGroup(pGroupResInfo)) {
return true;
}
return pGroupResInfo->currentGroup < pGroupResInfo->totalGroup;
}
bool incNextGroup(SGroupResInfo* pGroupResInfo) {
return (++pGroupResInfo->currentGroup) < pGroupResInfo->totalGroup;
}
......@@ -372,7 +385,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
}
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) {
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
......@@ -386,7 +399,7 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow
continue;
}
SResultRowCellInfo *pResultInfo = getResultCell(pRuntimeEnv, pResultRow, j);
SResultRowCellInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset);
assert(pResultInfo != NULL);
if (pResultInfo->numOfRes > 0) {
......@@ -437,7 +450,8 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
}
}
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, void* qinfo) {
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -455,7 +469,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
qError("QInfo:%p failed alloc memory", qinfo);
qError("QInfo:%p failed alloc memory", pRuntimeEnv->qinfo);
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
......@@ -491,7 +505,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if (num <= 0) {
cs.rowIndex[tableIndex] += 1;
......@@ -527,7 +541,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
int64_t endt = taosGetTimestampMs();
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", qinfo,
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pRuntimeEnv->qinfo,
pGroupResInfo->currentGroup, endt - startt);
_end:
......@@ -538,13 +552,13 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
return code;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pQInfo, pGroupResInfo->currentGroup);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
int32_t ret = mergeIntoGroupResultImpl(&pQInfo->runtimeEnv, pGroupResInfo, group, pQInfo);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -554,19 +568,83 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
break;
}
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pGroupResInfo->currentGroup);
qDebug("QInfo:%p no result in group %d, continue", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo);
}
if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) {
SET_STABLE_QUERY_OVER(pQInfo);
}
int64_t elapsedTime = taosGetTimestampUs() - st;
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo,
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo,
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
pQInfo->runtimeEnv.summary.firstStageMergeTime += elapsedTime;
// pQInfo->summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS;
}
void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
tbufWriteUint32(bw, pDist->numOfTables);
tbufWriteUint16(bw, pDist->numOfFiles);
tbufWriteUint64(bw, pDist->totalSize);
tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
// compress the binary string
char* p = TARRAY_GET_START(pDist->dataBlockInfos);
// compress extra bytes
size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize;
char* tmp = malloc(x + 2);
bool comp = false;
int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0);
if (len == -1 || len >= x) { // compress failed, do not compress this binary data
comp = false;
len = (int32_t)x;
} else {
comp = true;
}
tbufWriteUint8(bw, comp);
tbufWriteUint32(bw, len);
if (comp) {
tbufWriteBinary(bw, tmp, len);
} else {
tbufWriteBinary(bw, p, len);
}
tfree(tmp);
}
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
SBufferReader br = tbufInitReader(data, len, false);
pDist->numOfTables = tbufReadUint32(&br);
pDist->numOfFiles = tbufReadUint16(&br);
pDist->totalSize = tbufReadUint64(&br);
pDist->numOfRowsInMemTable = tbufReadUint32(&br);
int64_t numOfBlocks = tbufReadUint64(&br);
bool comp = tbufReadUint8(&br);
uint32_t compLen = tbufReadUint32(&br);
size_t originalLen = (size_t) (numOfBlocks*sizeof(SFileBlockInfo));
char* outputBuf = NULL;
if (comp) {
outputBuf = malloc(originalLen);
size_t actualLen = compLen;
const char* compStr = tbufReadBinary(&br, &actualLen);
int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
(int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
assert(orignalLen == numOfBlocks*sizeof(SFileBlockInfo));
} else {
outputBuf = (char*) tbufReadBinary(&br, &originalLen);
}
pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t) numOfBlocks, sizeof(SFileBlockInfo));
if (comp) {
tfree(outputBuf);
}
}
......@@ -96,7 +96,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
}
if (param.pSecExprMsg != NULL) {
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExprMsg, param.pExprs)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
......@@ -144,11 +144,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
qDebug("qmsg:%p query on %" PRIzu " tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
qDebug("qmsg:%p query on %u tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
}
int64_t el = taosGetTimestampUs() - st;
qDebug("qmsg:%p tag filter completed, numOfTables:%" PRIzu ", elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
qDebug("qmsg:%p tag filter completed, numOfTables:%u, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else {
assert(0);
}
......@@ -209,6 +209,7 @@ bool qTableQuery(qinfo_t qinfo) {
return false;
}
pQInfo->startExecTs = taosGetTimestampSec();
if (isQueryKilled(pQInfo)) {
......@@ -216,9 +217,10 @@ bool qTableQuery(qinfo_t qinfo) {
return doBuildResCheck(pQInfo);
}
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
}
......@@ -232,26 +234,16 @@ bool qTableQuery(qinfo_t qinfo) {
qDebug("QInfo:%p query task is launched", pQInfo);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
buildTagQueryResult(pQInfo);
} else if (pQInfo->runtimeEnv.stableQuery) {
stableQueryImpl(pQInfo);
} else if (pQInfo->runtimeEnv.queryBlockDist){
buildTableBlockDistResult(pQInfo);
} else {
tableQueryImpl(pQInfo);
}
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
SQuery* pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
qDebug("QInfo:%p over, %u tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv));
}
return doBuildResCheck(pQInfo);
......@@ -279,6 +271,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
*buildRes = true;
code = pQInfo->code;
} else {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock);
......@@ -286,8 +279,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->resultRowSize,
pQuery->rec.rows, tstrerror(pQInfo->code));
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize,
GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
......@@ -309,12 +302,13 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = pQuery->resultRowSize * s;
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);
size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
......@@ -324,27 +318,27 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
(*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows);
(*pRsp)->numOfRows = htonl((int32_t)s);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
(*pRsp)->offset = htobe64(pQInfo->runtimeEnv.currentOffset);
(*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime);
} else {
(*pRsp)->offset = 0;
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
(*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime);
}
(*pRsp)->precision = htons(pQuery->precision);
if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data);
} else {
setQueryStatus(pQuery, QUERY_OVER);
setQueryStatus(pRuntimeEnv, QUERY_OVER);
}
pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_OVER)) {
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
*continueExec = false;
(*pRsp)->completed = 1; // notify no more result to client
......@@ -394,8 +388,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER);
}
void qDestroyQueryInfo(qinfo_t qHandle) {
......
此差异已折叠。
......@@ -10,7 +10,7 @@ namespace {
// simple test
void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4096, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, NULL);
int32_t pageId = 0;
int32_t groupId = 0;
......@@ -52,7 +52,7 @@ void simpleTest() {
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4*1024, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;
......@@ -99,7 +99,7 @@ void writeDownTest() {
void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4*1024, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;
......
此差异已折叠。
......@@ -26,6 +26,7 @@ extern "C" {
#define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
#define TARRAY_GET_START(array) ((array)->pData)
typedef struct SArray {
size_t size;
......
......@@ -73,14 +73,14 @@ int main( int argc, char** argv ) {
}
*/
typedef struct {
typedef struct SBufferReader {
bool endian;
const char* data;
size_t pos;
size_t size;
} SBufferReader;
typedef struct {
typedef struct SBufferWriter {
bool endian;
char* data;
size_t pos;
......
......@@ -94,8 +94,9 @@ while $i < 10
$i = $i + 1
endw
print ==> sleep 8 seconds to renew cache
sleep 8000
print ==> sleep 1 seconds to renew cache
sql reset query cache
sleep 1000
print =============== step5
......
......@@ -55,7 +55,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
print =============== step3
print ==> sleep 8 seconds to renew cache
print ==> sleep 1 seconds to renew cache
sql reset query cache
sleep 1000
......
......@@ -39,10 +39,9 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
print =============== step3
print ==> sleep 8 seconds to renew cache
sleep 2000
print ==> sleep 1 seconds to renew cache
sql reset query cache
sleep 18000
sleep 1000
print =============== step4
sql create database $db
......
......@@ -93,6 +93,7 @@ $halfTbNum = $tbNum / 2
$nchar = 'nchar . $c
$nchar = $nchar . '
$ts = $ts + 1
sql insert into $tb5 values ( $ts , NULL , $c , NULL , $c , NULL , $c , NULL, NULL , $nchar ) $tb6 values ( $ts , NULL , $c , NULL , $c , NULL , $c , NULL, NULL , $nchar ) $tb7 values ( $ts , NULL , $c , NULL , $c , NULL , $c , NULL, NULL , $nchar ) $tb8 values ( $ts , NULL , $c , NULL , $c , NULL , $c , NULL, NULL , $nchar ) $tb9 values ( $ts , NULL , $c , NULL , $c , NULL , $c , NULL, NULL , $nchar )
$x = $x + 1
endw
......
......@@ -426,7 +426,7 @@ if $data02 != 9.000000020 then
endi
# all possible function in the arithmetic expression, add more
sql select min(c1) * max(c2) /4, sum(c1) * apercentile(c2, 20), apercentile(c4, 33) + 52/9, spread(c5)/min(c2), count(1)/sum(c1), avg(c2)*count(c2) from $stb where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-11-25 19:30:00.000';
sql select min(c1) * max(c2) /4, sum(c1) * apercentile(c2, 20), apercentile(c4, 33) + 52/9, spread(c5)/min(c2), count(1)/sum(c1), avg(c2)*count(c2) from $stb where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-11-25 19:30:01.000';
if $rows != 1 then
return -1
endi
......@@ -462,7 +462,7 @@ if $rows != 0 then
endi
# no result return [d.3]
sql select sum(c2) - avg(c2) from $stb where ts > '2018-11-25 19:30:00.000'
sql select sum(c2) - avg(c2) from $stb where ts > '2018-11-25 19:30:01.000'
if $rows != 0 then
return -1
endi
......@@ -520,35 +520,35 @@ if $data91 != 9 then
endi
# in group by column
sql select apercentile(c6, 50)-first(c6)+last(c5)*12, last(c5)*12 from ca_stb0 group by c2;
if $rows != 10 then
return -1
endi
if $data00 != 0.000000000 then
return -1
endi
if $data01 != 0.000000000 then
return -1
endi
if $data10 != 12.000000000 then
return -1
endi
if $data11 != 12.000000000 then
return -1
endi
if $data20 != 24.000000000 then
return -1
endi
if $data21 != 24.000000000 then
return -1
endi
#sql select apercentile(c6, 50)-first(c6)+last(c5)*12, last(c5)*12 from ca_stb0 group by c2;
#if $rows != 10 then
# return -1
#endi
#
#if $data00 != 0.000000000 then
# return -1
#endi
#
#if $data01 != 0.000000000 then
# return -1
#endi
#
#if $data10 != 12.000000000 then
# return -1
#endi
#
#if $data11 != 12.000000000 then
# return -1
#endi
#
#if $data20 != 24.000000000 then
# return -1
#endi
#
#if $data21 != 24.000000000 then
# return -1
#endi
#
sql_error select first(c6) - last(c6) *12 / count(*) from $stb group by c3;
sql select first(c6) - last(c6) *12 / count(*) from $stb group by c5;
......
......@@ -313,6 +313,7 @@ if $rows != 9 then
return -1
endi
if $data01 != 0 then
print expect 0, actual:$data01
return -1
endi
if $data11 != 6 then
......@@ -979,10 +980,6 @@ if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data1
if $data01 != 2.000000000 then
return -1
endi
......
......@@ -579,7 +579,7 @@ $tb = $tbPrefix . 0
## interp(*) from stb + group by + fill(none)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(NULL) group by tbname
sql select interp(*) from $stb where ts = $t fill(NULL) group by tbname
if $rows != $tbNum then
return -1
endi
......
......@@ -347,6 +347,7 @@ $val = $rowNum + $rowNum
print $val
print $rows
if $rows != $val then
print expect $val , actual:$rows
return -1
endi
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册