diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index 918604f8c9b6122f255ace647119040e288956a4..00049b486d765f341a97ce45250ec764f67502db 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -45,7 +45,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *); * */ typedef struct tQueryInfo { - int32_t colIndex; // index of column in schema uint8_t optr; // expression operator SSchema sch; // schema of tags char* q; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6dba5cbd2a2f0501a8144ea14da818e88ab1985a..42fc90f47810963b21ac9b54b99a45976a94894c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -50,11 +50,6 @@ #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} -/* get the qinfo struct address from the query struct address */ -#define GET_COLUMN_BYTES(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes) -#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type) - enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, @@ -394,15 +389,15 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin if (pWindowResInfo->size >= pWindowResInfo->capacity) { int64_t newCap = pWindowResInfo->capacity * 1.5; char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult)); - if (t != NULL) { - pWindowResInfo->pResult = (SWindowResult *)t; - - int32_t inc = newCap - pWindowResInfo->capacity; - memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc); - } else { - // todo + if (t == NULL) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + pWindowResInfo->pResult = (SWindowResult *)t; + + int32_t inc = newCap - pWindowResInfo->capacity; + memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc); + for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); } @@ -1053,9 +1048,9 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, *type = pQuery->colList[colIndex].type; *bytes = pQuery->colList[colIndex].bytes; /* - * the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare - * stage, the remain meter may not have the required column in cache actually. So, the validation of required - * column in cache with the corresponding meter schema is reinforced. + * the colIndex is acquired from the first tables of all qualified tables in this vnode during query prepare + * stage, the remain tables may not have the required column in cache actually. So, the validation of required + * column in cache with the corresponding schema is reinforced. */ int32_t numOfCols = taosArrayGetSize(pDataBlock); @@ -2364,6 +2359,18 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa } } +static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId) { + assert(pTagColList != NULL && numOfTags > 0); + + for(int32_t i = 0; i < numOfTags; ++i) { + if (pTagColList[i].colId == colId) { + return &pTagColList[i]; + } + } + + return NULL; +} + void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); @@ -2372,16 +2379,10 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { assert(pExprInfo->base.numOfParams == 1); - // todo refactor extract function. - int16_t type = -1, bytes = -1; - for(int32_t i = 0; i < pQuery->numOfTags; ++i) { - if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) { - type = pQuery->tagColList[i].type; - bytes = pQuery->tagColList[i].bytes; - } - } + int16_t tagColId = pExprInfo->base.arg->argValue.i64; + SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); - doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); + doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes); } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { @@ -2399,20 +2400,14 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { // set the join tag for first column SSqlFuncMsg *pFuncMsg = &pExprInfo->base; - if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && - pRuntimeEnv->pTSBuf != NULL) { + if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTSBuf != NULL && + pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { assert(pFuncMsg->numOfParams == 1); - // todo refactor - int16_t type = -1, bytes = -1; - for(int32_t i = 0; i < pQuery->numOfTags; ++i) { - if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) { - type = pQuery->tagColList[i].type; - bytes = pQuery->tagColList[i].bytes; - } - } + int16_t tagColId = pExprInfo->base.arg->argValue.i64; + SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); - doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); + doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes); qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.i64Key) } @@ -4149,6 +4144,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } else { pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); } + return terrno; } @@ -4174,10 +4170,10 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { } int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) { - int32_t code = TSDB_CODE_SUCCESS; - + qDebug("QInfo:%p start to init qhandle", pQInfo); SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; pQuery->precision = tsdbGetCfg(tsdb)->precision; @@ -4186,6 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQInfo, false); + code = setupQueryHandle(tsdb, pQInfo, isSTableQuery); if (code != TSDB_CODE_SUCCESS) { return code; @@ -5693,7 +5690,6 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } } - static int compareTableIdInfo(const void* a, const void* b) { const STableIdInfo* x = (const STableIdInfo*)a; const STableIdInfo* y = (const STableIdInfo*)b; @@ -5926,6 +5922,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ pQuery->window.ekey, pQuery->order.order); setQueryStatus(pQuery, QUERY_COMPLETED); pQInfo->tableqinfoGroupInfo.numOfTables = 0; + sem_post(&pQInfo->dataReady); return TSDB_CODE_SUCCESS; } @@ -6136,16 +6133,17 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo int32_t code = TSDB_CODE_SUCCESS; - char * tagCond = NULL, *tbnameCond = NULL; - SArray * pTableIdList = NULL; - SSqlFuncMsg **pExprMsg = NULL; - SColIndex * pGroupColIndex = NULL; - SColumnInfo* pTagColumnInfo = NULL; - SExprInfo *pExprs = NULL; - SSqlGroupbyExpr *pGroupbyExpr = NULL; + char *tagCond = NULL; + char *tbnameCond = NULL; + SArray *pTableIdList = NULL; + SSqlFuncMsg **pExprMsg = NULL; + SExprInfo *pExprs = NULL; + SColIndex *pGroupColIndex = NULL; + SColumnInfo *pTagColumnInfo = NULL; + SSqlGroupbyExpr *pGroupbyExpr = NULL; - if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo)) != - TSDB_CODE_SUCCESS) { + code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo); + if (code != TSDB_CODE_SUCCESS) { goto _over; } @@ -6172,7 +6170,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo bool isSTableQuery = false; STableGroupInfo tableGroupInfo = {0}; - + int64_t st = taosGetTimestampUs(); + if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) { STableIdInfo *id = taosArrayGet(pTableIdList, 0); @@ -6182,7 +6181,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo } } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; - // TODO: need a macro from TSDB to check if table is super table // also note there's possibility that only one table in the super table if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) { @@ -6193,11 +6191,12 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { numOfGroupByCols = 0; } - + + qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid); code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex, numOfGroupByCols); if (code != TSDB_CODE_SUCCESS) { - qError("qmsg:%p failed to QueryStable, reason: %s", pQueryMsg, tstrerror(code)); + qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code)); goto _over; } } else { @@ -6208,6 +6207,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo qDebug("qmsg:%p query on %zu tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables); } + + int64_t el = taosGetTimestampUs() - st; + qDebug("qmsg:%p tag filter completed, elapsed time:%"PRId64"us", pQueryMsg, el); } else { assert(0); } @@ -6247,7 +6249,7 @@ _over: *pQInfo = NULL; } - // if failed to add ref for all meters in this query, abort current query + // if failed to add ref for all tables in this query, abort current query return code; } diff --git a/src/query/src/qast.c b/src/query/src/qast.c index da4eb8f3baf67639615ec86b1f0b7b44269219e6..42f9f214f5785a76e26904f844f4501c0e4aff0c 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -678,7 +678,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, tstr *name = (tstr*) tsdbGetTableName(*(void**) pData); // todo speed up by using hash - if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { + if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->optr == TSDB_RELATION_IN) { addToResult = pQueryInfo->compare(name, pQueryInfo->q); } else if (pQueryInfo->optr == TSDB_RELATION_LIKE) { @@ -716,7 +716,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S } tQueryInfo *pQueryInfo = pExpr->_node.info; - if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) { + if (pQueryInfo->sch.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pQueryInfo->optr != TSDB_RELATION_LIKE) { tQueryIndexColumn(pSkipList, pQueryInfo, result); } else { tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index eb34805de47d713d78830d7ac8c2a47e45c378d0..8463e791db9a865012dabe420c31d2bf6c573a47 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -240,7 +240,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); - tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); + tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); @@ -1869,7 +1869,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } - // todo opt perf SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i); if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) { pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst; @@ -1961,43 +1960,20 @@ static void destroyHelper(void* param) { free(param); } -#define TAG_INVALID_COLUMN_INDEX -2 -static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) { - // filter on table name(TBNAME) - if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) { - return TSDB_TBNAME_COLUMN_INDEX; - } - - for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) { - STColumn* pColumn = &pTSchema->columns[i]; - if (pColumn->bytes == pSchema->bytes && pColumn->type == pSchema->type && pColumn->colId == pSchema->colId) { - return i; - } - } - - return -2; -} - void filterPrepare(void* expr, void* param) { tExprNode* pExpr = (tExprNode*)expr; if (pExpr->_node.info != NULL) { return; } - int32_t i = 0; pExpr->_node.info = calloc(1, sizeof(tQueryInfo)); - STSchema* pTSSchema = (STSchema*) param; - + STSchema* pTSSchema = (STSchema*) param; tQueryInfo* pInfo = pExpr->_node.info; tVariant* pCond = pExpr->_node.pRight->pVal; SSchema* pSchema = pExpr->_node.pLeft->pSchema; - int32_t index = getTagColumnIndex(pTSSchema, pSchema); - assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX) || index == TAG_INVALID_COLUMN_INDEX); - pInfo->sch = *pSchema; - pInfo->colIndex = index; pInfo->optr = pExpr->_node.optr; pInfo->compare = getComparFunc(pSchema->type, pInfo->optr); pInfo->param = pTSSchema; @@ -2143,7 +2119,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { char* val = NULL; - if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { + if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { val = (char*) TABLE_NAME(pTable); } else { val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);