From 87969990685676e804ec872d3a39f994eccf405e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Mar 2021 11:59:55 +0800 Subject: [PATCH] [td-225] --- src/client/inc/tscUtil.h | 2 + src/client/inc/tsclient.h | 18 +- src/client/src/tscAsync.c | 9 +- src/client/src/tscLocal.c | 4 +- src/client/src/tscLocalMerge.c | 93 +++++----- src/client/src/tscPrepare.c | 2 +- src/client/src/tscProfile.c | 2 +- src/client/src/tscSQLParser.c | 18 +- src/client/src/tscServer.c | 104 ++++++----- src/client/src/tscSql.c | 8 +- src/client/src/tscSub.c | 6 +- src/client/src/tscSubquery.c | 6 +- src/client/src/tscUtil.c | 51 +++++- src/common/inc/tname.h | 27 +++ src/inc/taosdef.h | 5 +- src/inc/taosmsg.h | 55 +++--- src/mnode/src/mnodeProfile.c | 4 +- src/query/inc/qExecutor.h | 17 +- src/query/inc/qUtil.h | 2 +- src/query/src/qExecutor.c | 323 ++++++++++++++++----------------- src/query/src/qUtil.c | 4 +- src/query/src/queryMain.c | 15 +- src/vnode/src/vnodeRead.c | 24 +-- 23 files changed, 431 insertions(+), 368 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3ec44875fd..db6838e6c1 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -294,6 +294,8 @@ uint32_t tscGetTableMetaMaxSize(); int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); +void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx); + void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a4bf1d9005..b5bc1bd3cc 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -35,6 +35,7 @@ extern "C" { #include "qExecutor.h" #include "qSqlparser.h" #include "qTsbuf.h" +#include "qUtil.h" #include "tcmdtype.h" // forward declaration @@ -96,21 +97,6 @@ typedef struct STableMetaInfo { SArray *tagColList; // SArray, involved tag columns } STableMetaInfo; -/* the structure for sql function in select clause */ -typedef struct SSqlExpr { - char aliasName[TSDB_COL_NAME_LEN]; // as aliasName - SColIndex colInfo; - uint64_t uid; // refactor use the pointer - int16_t functionId; // function id in aAgg array - int16_t resType; // return value type - int16_t resBytes; // length of return value - int32_t interBytes; // inter result buffer size - int16_t numOfParams; // argument value of each function - tVariant param[3]; // parameters are not more than 3 - int32_t offset; // sub result column value of arithmetic expression. - int16_t resColId; // result column id -} SSqlExpr; - typedef struct SColumnIndex { int16_t tableIndex; int16_t columnIndex; @@ -290,7 +276,7 @@ typedef struct { char * pRsp; int32_t rspType; int32_t rspLen; - uint64_t qhandle; + uint64_t qid; int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable int32_t row; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index e34675a61b..255b965d77 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -161,8 +161,8 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) { - if (pRes->qhandle == 0 && numOfRows != 0) { + if ((pRes->qid == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) { + if (pRes->qid == 0 && numOfRows != 0) { tscError("qhandle is NULL"); } else { pRes->code = numOfRows; @@ -210,10 +210,9 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { pSql->fp = tscAsyncFetchRowsProxy; pSql->param = param; - if (pRes->qhandle == 0) { - tscError("qhandle is NULL"); + if (pRes->qid == 0) { + tscError("qhandle is invalid"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; - tscAsyncResultOnError(pSql); return; } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 5e96db69c8..97c44f1f5e 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -309,7 +309,7 @@ TAOS_ROW tscFetchRow(void *param) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || + if (pRes->qid == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; @@ -905,7 +905,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to * free allocated resources and remove the SqlObj from sql query linked list */ - pRes->qhandle = 0x1; + pRes->qid = 0x1; pRes->numOfRows = 0; } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE) { pRes->code = tscProcessShowCreateTable(pSql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 66a1051a44..ede0390bfa 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -56,82 +56,83 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { } } -static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescriptor *pDesc) { - /* - * the fields and offset attributes in pCmd and pModel may be different due to - * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object. - */ - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); +// todo merge with vnode side function +void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { - SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; - SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i); + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); - pCtx->pOutput = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; - pCtx->order = pQueryInfo->order.order; - pCtx->functionId = pExpr->functionId; + pCtx[i].order = pQueryInfo->order.order; + pCtx[i].functionId = pExpr->functionId; // input buffer hold only one point data - int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); - SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i); - - pCtx->pInput = pReducer->pTempBuffer->data + offset; + pCtx[i].inputType = pExpr->colType; + pCtx[i].inputBytes = pExpr->colBytes; - // input data format comes from pModel - pCtx->inputType = pSchema->type; - pCtx->inputBytes = pSchema->bytes; + pCtx[i].outputBytes = pExpr->resBytes; + pCtx[i].outputType = pExpr->resType; - // output data format yet comes from pCmd. - pCtx->outputBytes = pExpr->resBytes; - pCtx->outputType = pExpr->resType; - - pCtx->size = 1; - pCtx->hasNull = true; - pCtx->currentStage = MERGE_STAGE; + pCtx[i].size = 1; + pCtx[i].hasNull = true; + pCtx[i].currentStage = MERGE_STAGE; // for top/bottom function, the output of timestamp is the first column int32_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx->ptsOutputBuf = pReducer->pCtx[0].pOutput; - pCtx->param[2].i64 = pQueryInfo->order.order; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[1].i64 = pQueryInfo->order.orderColId; + pCtx[i].ptsOutputBuf = pCtx[0].pOutput; + pCtx[i].param[2].i64 = pQueryInfo->order.order; + pCtx[i].param[2].nType = TSDB_DATA_TYPE_BIGINT; + pCtx[i].param[1].i64 = pQueryInfo->order.orderColId; } else if (functionId == TSDB_FUNC_APERCT) { - pCtx->param[0].i64 = pExpr->param[0].i64; - pCtx->param[0].nType = pExpr->param[0].nType; + pCtx[i].param[0].i64 = pExpr->param[0].i64; + pCtx[i].param[0].nType = pExpr->param[0].nType; } else if (functionId == TSDB_FUNC_BLKINFO) { - pCtx->param[0].i64 = pExpr->param[0].i64; - pCtx->param[0].nType = pExpr->param[0].nType; - pCtx->numOfParams = 1; + pCtx[i].param[0].i64 = pExpr->param[0].i64; + pCtx[i].param[0].nType = pExpr->param[0].nType; + pCtx[i].numOfParams = 1; } - pCtx->interBufBytes = pExpr->interBytes; - pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo)); - pCtx->stableQuery = true; + pCtx[i].interBufBytes = pExpr->interBytes; + pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo)); + pCtx[i].stableQuery = true; } int16_t n = 0; int16_t tagLen = 0; SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES); - SQLFunctionCtx *pCtx = NULL; + SQLFunctionCtx *pCtx1 = NULL; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pExpr->resBytes; - pTagCtx[n++] = &pReducer->pCtx[i]; + pTagCtx[n++] = &pCtx[i]; } else if ((aAggs[pExpr->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx = &pReducer->pCtx[i]; + pCtx1 = &pCtx[i]; } } if (n == 0 || pCtx == NULL) { free(pTagCtx); } else { - pCtx->tagInfo.pTagCtxList = pTagCtx; - pCtx->tagInfo.numOfTagCols = n; - pCtx->tagInfo.tagsLen = tagLen; + pCtx1->tagInfo.pTagCtxList = pTagCtx; + pCtx1->tagInfo.numOfTagCols = n; + pCtx1->tagInfo.tagsLen = tagLen; + } +} + +static void setCtxInputOutputBuffer(SQueryInfo* pQueryInfo, SQLFunctionCtx *pCtx, SLocalMerger *pReducer, + tOrderDescriptor *pDesc) { + size_t size = tscSqlExprNumOfExprs(pQueryInfo); + + for (int32_t i = 0; i < size; ++i) { + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); + pCtx[i].pOutput = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; + + // input buffer hold only one point data + int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); + pCtx[i].pInput = pReducer->pTempBuffer->data + offset; } } @@ -362,8 +363,8 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pReducer->pTempBuffer->num = 0; tscCreateResPointerInfo(pRes, pQueryInfo); - tscInitSqlContext(pCmd, pReducer, pDesc); - + tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx); + setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc); // we change the capacity of schema to denote that there is only one row in temp buffer pReducer->pDesc->pColumnModel->capacity = 1; @@ -1607,7 +1608,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) tscDestroyLocalMerger(pObj); } - pRes->qhandle = 1; // hack to pass the safety check in fetch_row function + pRes->qid = 1; // hack to pass the safety check in fetch_row function pRes->numOfRows = 0; pRes->row = 0; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 296dc3926c..0914f3cbb5 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -903,7 +903,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pRes->qhandle = 0; + pRes->qid = 0; pRes->numOfRows = 1; strtolower(pSql->sqlstr, sql); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 9203dcfbba..848f20c4ad 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -250,7 +250,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pQdesc->queryId = htonl(pSql->queryId); //pQdesc->useconds = htobe64(pSql->res.useconds); pQdesc->useconds = htobe64(now - pSql->stime); - pQdesc->qHandle = htobe64(pSql->res.qhandle); + pQdesc->qid = htobe64(pSql->res.qid); pHeartbeat->numOfQueries++; pQdesc++; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index cfe89813f0..4f0bd93c16 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1490,9 +1490,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t SExprInfo* pArithExprInfo = calloc(1, sizeof(SExprInfo)); // arithmetic expression always return result in the format of double float - pArithExprInfo->bytes = sizeof(double); - pArithExprInfo->interBytes = sizeof(double); - pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE; + pArithExprInfo->base.resBytes = sizeof(double); + pArithExprInfo->base.interBytes = sizeof(double); + pArithExprInfo->base.resType = TSDB_DATA_TYPE_DOUBLE; pArithExprInfo->base.functionId = TSDB_FUNC_ARITHM; pArithExprInfo->base.numOfParams = 1; @@ -1517,10 +1517,10 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t // TODO: other error handling } END_TRY - SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base; - pFuncMsg->arg[0].argBytes = (int16_t) tbufTell(&bw); - pFuncMsg->arg[0].argValue.pz = tbufGetData(&bw, true); - pFuncMsg->arg[0].argType = TSDB_DATA_TYPE_BINARY; + SSqlExpr* pFuncMsg = &pInfo->pArithExprInfo->base; + pFuncMsg->param[0].nLen = (int16_t) tbufTell(&bw); + pFuncMsg->param[0].pz = tbufGetData(&bw, true); + pFuncMsg->param[0].nType = TSDB_DATA_TYPE_BINARY; // tbufCloseWriter(&bw); // TODO there is a memory leak } @@ -6631,6 +6631,10 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) { return meta; } +//static SColumnInfo* getColumnInfoFromSchema(SQueryInfo* pUpstream) { +// +//} + int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e870f98e7f..f2b169f489 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -510,8 +510,9 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd); - pRetrieveMsg->free = htons(pQueryInfo->type); - pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); + + pRetrieveMsg->free = htons(pQueryInfo->type); + pRetrieveMsg->qid = htobe64(pSql->res.qid); // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -523,7 +524,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qid:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qid); } else { int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(vgIndex >= 0 && vgIndex < numOfVgroups); @@ -531,12 +532,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qid:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qid); } } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); - tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle); + tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qid:%" PRIu64, pSql, pTableMeta->vgId, pSql->res.qid); } pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); @@ -592,7 +593,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo)); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2); + int32_t exprSize = (int32_t)(sizeof(SSqlExpr) * numOfExprs * 2); int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0; int32_t sqlLen = (int32_t) strlen(pSql->sqlstr) + 1; @@ -818,7 +819,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg; + SSqlExpr *pSqlFuncExpr = (SSqlExpr *)pMsg; + for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); @@ -840,43 +842,41 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); - if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { - pSqlFuncExpr->colType = htons(pExpr->resType); - pSqlFuncExpr->colBytes = htons(pExpr->resBytes); - } else if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - SSchema *s = tGetTbnameColumnSchema(); + pSqlFuncExpr->colType = htons(pExpr->colType); + pSqlFuncExpr->colBytes = htons(pExpr->colBytes); - pSqlFuncExpr->colType = htons(s->type); - pSqlFuncExpr->colBytes = htons(s->bytes); + if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag) || pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + pSqlFuncExpr->resType = htons(pExpr->resType); + pSqlFuncExpr->resBytes = htons(pExpr->resBytes); } else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { SSchema s = tGetBlockDistColumnSchema(); - pSqlFuncExpr->colType = htons(s.type); - pSqlFuncExpr->colBytes = htons(s.bytes); + pSqlFuncExpr->resType = htons(s.type); + pSqlFuncExpr->resBytes = htons(s.bytes); } else { SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->colInfo.colId); - pSqlFuncExpr->colType = htons(s->type); - pSqlFuncExpr->colBytes = htons(s->bytes); + pSqlFuncExpr->resType = htons(s->type); + pSqlFuncExpr->resBytes = htons(s->bytes); } pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pSqlFuncExpr->resColId = htons(pExpr->resColId); - pMsg += sizeof(SSqlFuncMsg); + pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log - pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); - pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen); + pSqlFuncExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); + pSqlFuncExpr->param[j].nLen = htons(pExpr->param[j].nLen); if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); pMsg += pExpr->param[j].nLen; } else { - pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64); + pSqlFuncExpr->param[j].i64 = htobe64(pExpr->param[j].i64); } } - pSqlFuncExpr = (SSqlFuncMsg *)pMsg; + pSqlFuncExpr = (SSqlExpr *)pMsg; } size_t output = tscNumOfFields(pQueryInfo); @@ -884,7 +884,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (tscIsSecondStageQuery(pQueryInfo)) { pQueryMsg->secondStageOutput = htonl((int32_t) output); - SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; + SSqlExpr *pExpr1 = (SSqlExpr *)pMsg; for (int32_t i = 0; i < output; ++i) { SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); @@ -904,49 +904,56 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_SQL; } - pSqlFuncExpr1->numOfParams = 0; // no params for projection query - pSqlFuncExpr1->functionId = htons(TSDB_FUNC_PRJ); - pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId); - pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL); + pExpr1->numOfParams = 0; // no params for projection query + pExpr1->functionId = htons(TSDB_FUNC_PRJ); + pExpr1->colInfo.colId = htons(pExpr->resColId); + pExpr1->colInfo.flag = htons(TSDB_COL_NORMAL); + pExpr1->colType = htons(pExpr->resType); + pExpr1->colBytes = htons(pExpr->resBytes); bool assign = false; for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) { SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f); if (pe == pExpr) { - pSqlFuncExpr1->colInfo.colIndex = htons(f); - pSqlFuncExpr1->colType = htons(pe->resType); - pSqlFuncExpr1->colBytes = htons(pe->resBytes); + pExpr1->colInfo.colIndex = htons(f); + pExpr1->resType = htons(pe->resType); + pExpr1->resBytes = htons(pe->resBytes); assign = true; break; } } assert(assign); - pMsg += sizeof(SSqlFuncMsg); - pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; + pMsg += sizeof(SSqlExpr); + pExpr1 = (SSqlExpr *)pMsg; } else { assert(pField->pArithExprInfo != NULL); SExprInfo* pExprInfo = pField->pArithExprInfo; - pSqlFuncExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId); - pSqlFuncExpr1->functionId = htons(pExprInfo->base.functionId); - pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams); - pMsg += sizeof(SSqlFuncMsg); + pExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId); + pExpr1->colType = htons(pExprInfo->base.colType); + pExpr1->colBytes = htons(pExprInfo->base.colBytes); + pExpr1->resBytes = htons(sizeof(double)); + pExpr1->resType = htons(TSDB_DATA_TYPE_DOUBLE); + + pExpr1->functionId = htons(pExprInfo->base.functionId); + pExpr1->numOfParams = htons(pExprInfo->base.numOfParams); + pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { // todo add log - pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType); - pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes); + pExpr1->param[j].nType = htons((uint16_t)pExprInfo->base.param[j].nType); + pExpr1->param[j].nLen = htons(pExprInfo->base.param[j].nLen); - if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) { - memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes); - pMsg += pExprInfo->base.arg[j].argBytes; + if (pExprInfo->base.param[j].nType == TSDB_DATA_TYPE_BINARY) { + memcpy(pMsg, pExprInfo->base.param[j].pz, pExprInfo->base.param[j].nLen); + pMsg += pExprInfo->base.param[j].nLen; } else { - pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64); + pExpr1->param[j].i64 = htobe64(pExprInfo->base.param[j].i64); } } - pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; + pExpr1 = (SSqlExpr *)pMsg; } } } else { @@ -1561,7 +1568,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload; - pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); + pRetrieveMsg->qid = htobe64(pSql->res.qid); pRetrieveMsg->free = htons(pQueryInfo->type); return TSDB_CODE_SUCCESS; @@ -2107,7 +2114,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { pShow = (SShowRsp *)pRes->pRsp; pShow->qhandle = htobe64(pShow->qhandle); - pRes->qhandle = pShow->qhandle; + pRes->qid = pShow->qhandle; tscResetForNextRetrieve(pRes); pMetaMsg = &(pShow->tableMeta); @@ -2289,10 +2296,11 @@ int tscProcessQueryRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp; - pQuery->qhandle = htobe64(pQuery->qhandle); - pRes->qhandle = pQuery->qhandle; + pQuery->qid = htobe64(pQuery->qid); + pRes->qid = pQuery->qid; pRes->data = NULL; + tscResetForNextRetrieve(pRes); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index d845bb1b1e..f11d419229 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -476,7 +476,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || + if (pRes->qid == 0 || pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { @@ -508,7 +508,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || + if (pRes->qid == 0 || pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { @@ -554,7 +554,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - if (pRes == NULL || pRes->qhandle == 0) { + if (pRes == NULL || pRes->qid == 0) { return true; } @@ -1050,7 +1050,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { * If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql() * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. */ - pRes->qhandle = 0; + pRes->qid = 0; free(str); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 7f16aca260..ad850d2a9e 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -149,7 +149,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* } strtolower(pSql->sqlstr, pSql->sqlstr); - pRes->qhandle = 0; + pRes->qid = 0; pRes->numOfRows = 1; code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); @@ -448,7 +448,7 @@ SSqlObj* recreateSqlObj(SSub* pSub) { return NULL; } - pRes->qhandle = 0; + pRes->qid = 0; pRes->numOfRows = 1; int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); @@ -546,7 +546,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { uint32_t type = pQueryInfo->type; tscFreeSqlResult(pSql); pRes->numOfRows = 1; - pRes->qhandle = 0; + pRes->qid = 0; pSql->cmd.command = TSDB_SQL_SELECT; pQueryInfo->type = type; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 11d98ea6f9..ebd8c247f5 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1584,7 +1584,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - pSql->res.qhandle = 0x1; + pSql->res.qid = 0x1; assert(pSql->res.numOfRows == 0); if (pSql->pSubs == NULL) { @@ -2179,7 +2179,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SColumnModel *pModel = NULL; SColumnModel *pFinalModel = NULL; - pRes->qhandle = 0x1; // hack the qhandle check + pRes->qid = 0x1; // hack the qhandle check const uint32_t nBufferSize = (1u << 16u); // 64KB @@ -2727,7 +2727,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); - if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode + if (pSql->res.qid == 0) { // qhandle is 0, code is TSDB_CODE_SUCCESS means no results generated from this vnode tscRetrieveFromDnodeCallBack(param, pSql, 0); } else { taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7bfe1754df..fc6c45334e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -371,11 +371,39 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { } } +static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* pTableCols) { + int32_t numOfCols = taosArrayGetSize(pTableCols); + SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo)); + + SSchema *pSchema = pTableMeta->schema; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumn* pCol = taosArrayGetP(pTableCols, i); + int32_t index = pCol->colIndex.columnIndex; + + pColInfo[i].type = pSchema[index].type; + pColInfo[i].bytes = pSchema[index].bytes; + pColInfo[i].colId = pSchema[index].colId; + } + + return pColInfo; +} + void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { // handle the following query process SQueryInfo* px = taosArrayGetP(pQueryInfo->pDownstream, 0); printf("%d\n", px->type); + + SColumnInfo* colInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList); + int32_t numOfOutput = tscSqlExprNumOfExprs(px); + + SExprInfo *exprInfo = NULL; + SQLFunctionCtx *pCtx = calloc(numOfOutput, sizeof(SQLFunctionCtx)); + + int32_t numOfCols = taosArrayGetSize(px->colList); + SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,}; + /*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); + tsCreateSQLFunctionCtx(px, pCtx); } } @@ -1095,11 +1123,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { if (pInfo->pArithExprInfo != NULL) { tExprTreeDestroy(pInfo->pArithExprInfo->pExpr, NULL); - SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base; + SSqlExpr* pFuncMsg = &pInfo->pArithExprInfo->base; for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) { - if (pFuncMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { - tfree(pFuncMsg->arg[j].argValue.pz); - } + tVariantDestroy(&pFuncMsg->param[j]); } tfree(pInfo->pArithExprInfo); @@ -1125,20 +1151,33 @@ static SSqlExpr* doCreateSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCo // set the correct columnIndex index if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + SSchema* s = tGetTbnameColumnSchema(); pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX; + pExpr->colBytes = s->bytes; + pExpr->colType = s->type; } else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) { + SSchema s = tGetBlockDistColumnSchema(); + pExpr->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX; + pExpr->colBytes = s.bytes; + pExpr->colType = s.type; } else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) { pExpr->colInfo.colId = pColIndex->columnIndex; + pExpr->colBytes = size; + pExpr->colType = type; } else { if (TSDB_COL_IS_TAG(colType)) { SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); pExpr->colInfo.colId = pSchema[pColIndex->columnIndex].colId; + pExpr->colBytes = pSchema[pColIndex->columnIndex].bytes; + pExpr->colType = pSchema[pColIndex->columnIndex].type; tstrncpy(pExpr->colInfo.name, pSchema[pColIndex->columnIndex].name, sizeof(pExpr->colInfo.name)); } else if (pTableMetaInfo->pTableMeta != NULL) { // in handling select database/version/server_status(), the pTableMeta is NULL SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex); pExpr->colInfo.colId = pSchema->colId; + pExpr->colBytes = pSchema->bytes; + pExpr->colType = pSchema->type; tstrncpy(pExpr->colInfo.name, pSchema->name, sizeof(pExpr->colInfo.name)); } } @@ -1769,7 +1808,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; - pQueryInfo->resColumnId = -1000; + pQueryInfo->resColumnId = TSDB_RES_COL_ID; pQueryInfo->limit.limit = -1; pQueryInfo->limit.offset = 0; @@ -2128,6 +2167,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); pNewQueryInfo->command = pQueryInfo->command; + pnCmd->active = pNewQueryInfo; + memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval)); pNewQueryInfo->type = pQueryInfo->type; pNewQueryInfo->window = pQueryInfo->window; diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index 465b298973..fb57565f84 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -41,6 +41,33 @@ typedef struct SResPair { double avg; } SResPair; +/* the structure for sql function in select clause */ +typedef struct SSqlExpr { + char aliasName[TSDB_COL_NAME_LEN]; // as aliasName + SColIndex colInfo; + uint64_t uid; // refactor use the pointer + + int16_t functionId; // function id in aAgg array + + int16_t resType; // return value type + int16_t resBytes; // length of return value + int32_t interBytes; // inter result buffer size + + int16_t colType; // table column type + int16_t colBytes; // table column bytes + + int16_t numOfParams; // argument value of each function + tVariant param[3]; // parameters are not more than 3 + int32_t offset; // sub result column value of arithmetic expression. + int16_t resColId; // result column id +} SSqlExpr; + +typedef struct SExprInfo { + SSqlExpr base; + int64_t uid; + struct tExprNode* pExpr; +} SExprInfo; + #define TSDB_DB_NAME_T 1 #define TSDB_TABLE_NAME_T 2 diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index f574a02538..6bd4c967f5 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -243,8 +243,9 @@ do { \ #define TSDB_MAX_REPLICA 5 #define TSDB_TBNAME_COLUMN_INDEX (-1) -#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2) -#define TSDB_UD_COLUMN_INDEX (-100) +#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2) +#define TSDB_UD_COLUMN_INDEX (-1000) +#define TSDB_RES_COL_ID (-5000) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 00f64eba77..54025d3223 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -398,35 +398,26 @@ typedef struct SColIndex { } SColIndex; /* sql function msg, to describe the message to vnode about sql function - * operations in select clause */ -typedef struct SSqlFuncMsg { - int16_t functionId; - int16_t numOfParams; - - int16_t resColId; // result column id, id of the current output column - int16_t colType; - int16_t colBytes; - - SColIndex colInfo; - struct ArgElem { - int16_t argType; - int16_t argBytes; - union { - double d; - int64_t i64; - char * pz; - } argValue; - } arg[3]; -} SSqlFuncMsg; - -typedef struct SExprInfo { - SSqlFuncMsg base; - struct tExprNode* pExpr; - int16_t bytes; - int16_t type; - int32_t interBytes; - int64_t uid; -} SExprInfo; + * operations in select */ +//typedef struct SSqlFuncMsg { +// int16_t functionId; +// int16_t numOfParams; +// +// int16_t resColId; // result column id, id of the current output column +// int16_t colType; +// int16_t colBytes; +// +// SColIndex colInfo; +// struct ArgElem { +// int16_t argType; +// int16_t argBytes; +// union { +// double d; +// int64_t i64; +// char * pz; +// } argValue; +// } arg[3]; +//} SSqlFuncMsg; typedef struct SColumnFilterInfo { int16_t lowerRelOptr; @@ -513,12 +504,12 @@ typedef struct { typedef struct { int32_t code; - uint64_t qhandle; // query handle + uint64_t qid; // dnode generated query id } SQueryTableRsp; typedef struct { SMsgHead header; - uint64_t qhandle; + union{uint64_t qid; uint64_t qhandle;}; uint16_t free; } SRetrieveTableMsg; @@ -818,7 +809,7 @@ typedef struct { uint32_t queryId; int64_t useconds; int64_t stime; - uint64_t qHandle; + uint64_t qid; } SQueryDesc; typedef struct { diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 17a4282d05..fe19d10963 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -344,7 +344,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->bytes[cols] = 24; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "qhandle"); + strcpy(pSchema[cols].name, "qid"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -420,7 +420,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v cols++; char handleBuf[24] = {0}; - snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qHandle)); + snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qid)); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5ff574ec67..31cc4da69e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -306,6 +306,12 @@ enum { QUERY_RESULT_READY = 2, }; +typedef struct { + int32_t numOfTags; + int32_t numOfCols; + SColumnInfo *colList; +} SQueriedTableInfo; + typedef struct SQInfo { void* signature; uint64_t qId; @@ -331,8 +337,8 @@ typedef struct SQueryParam { char *tbnameCond; char *prevResult; SArray *pTableIdList; - SSqlFuncMsg **pExprMsg; - SSqlFuncMsg **pSecExprMsg; + SSqlExpr **pExpr; + SSqlExpr **pSecExpr; SExprInfo *pExprs; SExprInfo *pSecExprs; @@ -423,10 +429,11 @@ typedef struct SSWindowOperatorInfo { void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); -int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, - SColumnInfo* pTagCols); +int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, + SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg); + int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, - SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr); + SSqlExpr **pExpr, SExprInfo *prevExpr); SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 31dfc350a3..3227535a89 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -27,7 +27,7 @@ #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define curTimeWindowIndex(_winres) ((_winres)->curIndex) -#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1) +#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1) int32_t getOutputInterResultBufSize(SQuery* pQuery); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d1d184aa75..2378db9691 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -214,8 +214,8 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32 res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData idata = {{0}}; - idata.info.type = pExpr[i].type; - idata.info.bytes = pExpr[i].bytes; + idata.info.type = pExpr[i].base.resType; + idata.info.bytes = pExpr[i].base.resBytes; idata.info.colId = pExpr[i].base.resColId; idata.pData = calloc(1, MAX(idata.info.bytes * numOfRows, minSize)); // at least to hold a pointer on x64 platform @@ -1607,10 +1607,10 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr } for (int32_t i = 0; i < numOfOutput; ++i) { - SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base; + SSqlExpr *pSqlExpr = &pExpr[i].base; SQLFunctionCtx* pCtx = &pFuncCtx[i]; - SColIndex *pIndex = &pSqlFuncMsg->colInfo; + SColIndex *pIndex = &pSqlExpr->colInfo; if (TSDB_COL_REQ_NULL(pIndex->flag)) { pCtx->requireNull = true; @@ -1619,33 +1619,33 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->requireNull = false; } - pCtx->inputBytes = pSqlFuncMsg->colBytes; - pCtx->inputType = pSqlFuncMsg->colType; + pCtx->inputBytes = pSqlExpr->colBytes; + pCtx->inputType = pSqlExpr->colType; pCtx->ptsOutputBuf = NULL; - pCtx->outputBytes = pExpr[i].bytes; - pCtx->outputType = pExpr[i].type; + pCtx->outputBytes = pSqlExpr->resBytes; + pCtx->outputType = pSqlExpr->resType; pCtx->order = pQuery->order.order; - pCtx->functionId = pSqlFuncMsg->functionId; + pCtx->functionId = pSqlExpr->functionId; pCtx->stableQuery = pQuery->stableQuery; - pCtx->interBufBytes = pExpr[i].interBytes; + pCtx->interBufBytes = pSqlExpr->interBytes; pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; - pCtx->numOfParams = pSqlFuncMsg->numOfParams; + pCtx->numOfParams = pSqlExpr->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { - int16_t type = pSqlFuncMsg->arg[j].argType; - int16_t bytes = pSqlFuncMsg->arg[j].argBytes; - if (pSqlFuncMsg->functionId == TSDB_FUNC_STDDEV_DST) { + int16_t type = pSqlExpr->param[j].nType; + int16_t bytes = pSqlExpr->param[j].nLen; + if (pSqlExpr->functionId == TSDB_FUNC_STDDEV_DST) { continue; } if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - tVariantCreateFromBinary(&pCtx->param[j], pSqlFuncMsg->arg[j].argValue.pz, bytes, type); + tVariantCreateFromBinary(&pCtx->param[j], pSqlExpr->param[j].pz, bytes, type); } else { - tVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlFuncMsg->arg[j].argValue.i64, bytes, type); + tVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlExpr->param[j].i64, bytes, type); } } @@ -1688,7 +1688,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr for(int32_t i = 1; i < numOfOutput; ++i) { (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowCellInfo) + - pExpr[i - 1].interBytes * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); + pExpr[i - 1].base.interBytes * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); } setCtxTagColumnInfo(pFuncCtx, numOfOutput); @@ -1801,7 +1801,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - if (pQuery->pExpr2 != NULL) { + if (pQuery->pExpr2 != NULL && !pQuery->stableQuery) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } } else { // diff/add/multiply/subtract/division @@ -1916,19 +1916,19 @@ static bool isFixedOutputQuery(SQuery* pQuery) { } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pExprMsg = &pQuery->pExpr1[i].base; + SSqlExpr *pExpr = &pQuery->pExpr1[i].base; // ignore the ts_comp function - if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 && - pExprMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + if (i == 0 && pExpr->functionId == TSDB_FUNC_PRJ && pExpr->numOfParams == 1 && + pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { continue; } - if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) { + if (pExpr->functionId == TSDB_FUNC_TS || pExpr->functionId == TSDB_FUNC_TS_DUMMY) { continue; } - if (!IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].status)) { + if (!IS_MULTIOUTPUT(aAggs[pExpr->functionId].status)) { return true; } } @@ -1989,7 +1989,7 @@ static bool needReverseScan(SQuery *pQuery) { if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) { // the scan order to acquire the last result of the specified column - int32_t order = (int32_t)pQuery->pExpr1[i].base.arg->argValue.i64; + int32_t order = (int32_t)pQuery->pExpr1[i].base.param[0].i64; if (order != pQuery->order.order) { return true; } @@ -2048,12 +2048,12 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { } else { bool hasMultioutput = false; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pExprMsg = &pQuery->pExpr1[i].base; - if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) { + SSqlExpr *pExpr = &pQuery->pExpr1[i].base; + if (pExpr->functionId == TSDB_FUNC_TS || pExpr->functionId == TSDB_FUNC_TS_DUMMY) { continue; } - hasMultioutput = IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].status); + hasMultioutput = IS_MULTIOUTPUT(aAggs[pExpr->functionId].status); if (!hasMultioutput) { break; } @@ -2554,7 +2554,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa if (pQuery->stableQuery) { // todo refactor SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0]; - int16_t tagId = (int16_t)pExprInfo->base.arg->argValue.i64; + int16_t tagId = (int16_t)pExprInfo->base.param[0].i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagId); // compare tag first @@ -2781,7 +2781,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) { assert(pExprInfo->base.numOfParams == 1); - int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + int16_t tagColId = (int16_t)pExprInfo->base.param[0].i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); @@ -2800,16 +2800,16 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt } // todo use tag column index to optimize performance - doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type, - pLocalExprInfo->bytes); + doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->base.resType, + pLocalExprInfo->base.resBytes); - if (IS_NUMERIC_TYPE(pLocalExprInfo->type) || pLocalExprInfo->type == TSDB_DATA_TYPE_BOOL) { - memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->bytes); + if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resType) || pLocalExprInfo->base.resType == TSDB_DATA_TYPE_BOOL) { + memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->base.resBytes); } else { memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen); } - offset += pLocalExprInfo->bytes; + offset += pLocalExprInfo->base.resBytes; } //todo : use index to avoid iterator all possible output columns @@ -2901,25 +2901,25 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { - switch (pQuery->pExpr1[i].type) { + switch (pQuery->pExpr1[i].base.resType) { case TSDB_DATA_TYPE_BINARY: { - int32_t type = pQuery->pExpr1[i].type; - printBinaryData(pQuery->pExpr1[i].base.functionId, pdata[i]->data + pQuery->pExpr1[i].bytes * j, + int32_t type = pQuery->pExpr1[i].base.resType; + printBinaryData(pQuery->pExpr1[i].base.functionId, pdata[i]->data + pQuery->pExpr1[i].base.resBytes * j, type); break; } case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: - printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); + printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pExpr1[i].base.resBytes * j)); break; case TSDB_DATA_TYPE_INT: - printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); + printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pExpr1[i].base.resBytes * j)); break; case TSDB_DATA_TYPE_FLOAT: - printf("%f\t", *(float *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); + printf("%f\t", *(float *)(pdata[i]->data + pQuery->pExpr1[i].base.resBytes * j)); break; case TSDB_DATA_TYPE_DOUBLE: - printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); + printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pExpr1[i].base.resBytes * j)); break; } } @@ -3298,13 +3298,13 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { SQuery* pQuery = pRuntimeEnv->pQuery; - SSqlFuncMsg* pFuncMsg = &pExprInfo->base; + SSqlExpr* pExpr = &pExprInfo->base; if (pQuery->stableQuery && (pRuntimeEnv->pTsBuf != NULL) && - (pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && - (pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX)) { - assert(pFuncMsg->numOfParams == 1); + (pExpr->functionId == TSDB_FUNC_TS || pExpr->functionId == TSDB_FUNC_PRJ) && + (pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX)) { + assert(pExpr->numOfParams == 1); - int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + int16_t tagColId = (int16_t)pExprInfo->base.param[0].i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes); @@ -3312,10 +3312,10 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr int16_t tagType = pCtx[0].tag.nType; if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo, - pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz); + pExprInfo->base.param[0].i64, pCtx[0].tag.pz); } else { qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo, - pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64); + pExprInfo->base.param[0].i64, pCtx[0].tag.i64); } } } @@ -3362,17 +3362,17 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, return 0; } -void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr) { +void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfExprs = pQuery->numOfOutput; for(int32_t i = 0; i < numOfExprs; ++i) { - SExprInfo* pExprInfo = &(pExpr[i]); + SExprInfo* pExprInfo1 = &(pExprInfo[i]); if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { continue; } - SSqlFuncMsg* pFuncMsg = &pExprInfo->base; + SSqlExpr* pExpr = &pExprInfo1->base; pCtx[i].param[0].arr = NULL; pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int @@ -3385,7 +3385,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); for (int32_t k = 0; k < numOfCols; ++k) { SStddevInterResult* pres = taosArrayGet(p->pResult, k); - if (pres->colId == pFuncMsg->colInfo.colId) { + if (pres->colId == pExpr->colInfo.colId) { pCtx[i].param[0].arr = pres->pResult; break; } @@ -3943,15 +3943,15 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in for(int32_t i = 0; i < numOfOutput; ++i) { SExprInfo* pExprInfo = &pExpr[i]; - pFillCol[i].col.bytes = pExprInfo->bytes; - pFillCol[i].col.type = (int8_t)pExprInfo->type; + pFillCol[i].col.bytes = pExprInfo->base.resBytes; + pFillCol[i].col.type = (int8_t)pExprInfo->base.resBytes; pFillCol[i].col.offset = offset; pFillCol[i].tagIndex = -2; pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query pFillCol[i].functionId = pExprInfo->base.functionId; pFillCol[i].fillVal.i = fillVal[i]; - offset += pExprInfo->bytes; + offset += pExprInfo->base.resBytes; } return pFillCol; @@ -5211,12 +5211,12 @@ static SSDataBlock* doTagScan(void* param) { assert(pQuery->numOfOutput == 1); SExprInfo* pExprInfo = &pOperator->pExpr[0]; - int32_t rsize = pExprInfo->bytes; + int32_t rsize = pExprInfo->base.resBytes; count = 0; - int16_t bytes = pExprInfo->bytes; - int16_t type = pExprInfo->type; + int16_t bytes = pExprInfo->base.resBytes; + int16_t type = pExprInfo->base.resType; for(int32_t i = 0; i < pQuery->numOfTags; ++i) { if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) { @@ -5287,8 +5287,8 @@ static SSDataBlock* doTagScan(void* param) { } SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j); - type = pExprInfo[j].type; - bytes = pExprInfo[j].bytes; + type = pExprInfo[j].base.resType; + bytes = pExprInfo[j].base.resBytes; if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { data = tsdbGetTableName(item->pTable); @@ -5296,7 +5296,7 @@ static SSDataBlock* doTagScan(void* param) { data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes); } - dst = pColInfo->pData + count * pExprInfo[j].bytes; + dst = pColInfo->pData + count * pExprInfo[j].base.resBytes; doSetTagValueToResultBuf(dst, data, type, bytes); } @@ -5339,30 +5339,30 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf return pOperator; } -static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { +static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { int32_t j = 0; - if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { - if (pExprMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { + if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { return TSDB_TBNAME_COLUMN_INDEX; - } else if (pExprMsg->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { + } else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { return TSDB_BLOCK_DIST_COLUMN_INDEX; } - while(j < pQueryMsg->numOfTags) { - if (pExprMsg->colInfo.colId == pTagCols[j].colId) { + while(j < pTableInfo->numOfTags) { + if (pExpr->colInfo.colId == pTagCols[j].colId) { return j; } j += 1; } - } else if (TSDB_COL_IS_UD_COL(pExprMsg->colInfo.flag)) { // user specified column data + } else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data return TSDB_UD_COLUMN_INDEX; } else { - while (j < pQueryMsg->numOfCols) { - if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) { + while (j < pTableInfo->numOfCols) { + if (pExpr->colInfo.colId == pTableInfo->colList[j].colId) { return j; } @@ -5373,8 +5373,8 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value } -bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { - int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg, pTagCols); +bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { + int32_t j = getColumnIndexInSource(pTableInfo, pExpr, pTagCols); return j != INT32_MIN; } @@ -5413,21 +5413,21 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { return true; } -static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) { - int32_t numOfTotal = pQueryMsg->numOfCols + pQueryMsg->numOfTags; - if (pQueryMsg->numOfCols < 0 || pQueryMsg->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { - qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pQueryMsg, pQueryMsg->numOfCols, pQueryMsg->numOfTags); +static UNUSED_FUNC bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pExpr, int32_t numOfOutput, + SColumnInfo* pTagCols, void* pMsg) { + int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags; + if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { + qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pMsg, pTableInfo->numOfCols, pTableInfo->numOfTags); return false; } - if (numOfTotal == 0) { - for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { - SSqlFuncMsg* pFuncMsg = pExprMsg[i]; - - if ((pFuncMsg->functionId == TSDB_FUNC_TAGPRJ) || - (pFuncMsg->functionId == TSDB_FUNC_TID_TAG && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || - (pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || - (pFuncMsg->functionId == TSDB_FUNC_BLKINFO)) { + if (numOfTotal == 0) { // table total columns are not required. + for(int32_t i = 0; i < numOfOutput; ++i) { + SSqlExpr* p = pExpr[i]; + if ((p->functionId == TSDB_FUNC_TAGPRJ) || + (p->functionId == TSDB_FUNC_TID_TAG && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || + (p->functionId == TSDB_FUNC_COUNT && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || + (p->functionId == TSDB_FUNC_BLKINFO)) { continue; } @@ -5435,8 +5435,8 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx } } - for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { - if (!validateExprColumnInfo(pQueryMsg, pExprMsg[i], pTagCols)) { + for(int32_t i = 0; i < numOfOutput; ++i) { + if (!validateExprColumnInfo(pTableInfo, pExpr[i], pTagCols)) { return TSDB_CODE_QRY_INVALID_MSG; } } @@ -5569,38 +5569,41 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { } } - param->pExprMsg = calloc(pQueryMsg->numOfOutput, POINTER_BYTES); - if (param->pExprMsg == NULL) { + param->pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES); + if (param->pExpr == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _cleanup; } - SSqlFuncMsg *pExprMsg = (SSqlFuncMsg *)pMsg; + SSqlExpr *pExprMsg = (SSqlExpr *)pMsg; for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { - param->pExprMsg[i] = pExprMsg; + param->pExpr[i] = pExprMsg; pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - pExprMsg->colType = htons(pExprMsg->colType); pExprMsg->colBytes = htons(pExprMsg->colBytes); + pExprMsg->colType = htons(pExprMsg->colType); + + pExprMsg->resType = htons(pExprMsg->resType); + pExprMsg->resBytes = htons(pExprMsg->resBytes); pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pExprMsg->resColId = htons(pExprMsg->resColId); - pMsg += sizeof(SSqlFuncMsg); + pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); - pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes); + pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); + pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); - if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { - pExprMsg->arg[j].argValue.pz = pMsg; - pMsg += pExprMsg->arg[j].argBytes; // one more for the string terminated char. + if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { + pExprMsg->param[j].pz = pMsg; + pMsg += pExprMsg->param[j].nLen; // one more for the string terminated char. } else { - pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64); + pExprMsg->param[j].i64 = htobe64(pExprMsg->param[j].i64); } } @@ -5612,36 +5615,38 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { } } - pExprMsg = (SSqlFuncMsg *)pMsg; + pExprMsg = (SSqlExpr *)pMsg; } if (pQueryMsg->secondStageOutput) { - pExprMsg = (SSqlFuncMsg *)pMsg; - param->pSecExprMsg = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES); + pExprMsg = (SSqlExpr *)pMsg; + param->pSecExpr = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES); for (int32_t i = 0; i < pQueryMsg->secondStageOutput; ++i) { - param->pSecExprMsg[i] = pExprMsg; + param->pSecExpr[i] = pExprMsg; pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - pExprMsg->colType = htons(pExprMsg->colType); + pExprMsg->resType = htons(pExprMsg->resType); + pExprMsg->resBytes = htons(pExprMsg->resBytes); pExprMsg->colBytes = htons(pExprMsg->colBytes); + pExprMsg->colType = htons(pExprMsg->colType); pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); - pMsg += sizeof(SSqlFuncMsg); + pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); - pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes); + pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); + pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); - if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { - pExprMsg->arg[j].argValue.pz = pMsg; - pMsg += pExprMsg->arg[j].argBytes; // one more for the string terminated char. + if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { + pExprMsg->param[j].pz = pMsg; + pMsg += pExprMsg->param[j].nLen; // one more for the string terminated char. } else { - pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64); + pExprMsg->param[j].i64 = htobe64(pExprMsg->param[j].i64); } } @@ -5653,7 +5658,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { } } - pExprMsg = (SSqlFuncMsg *)pMsg; + pExprMsg = (SSqlExpr *)pMsg; } } @@ -5757,7 +5762,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { param->sql = strndup(pMsg, pQueryMsg->sqlstrLen); - if (!validateQuerySourceCols(pQueryMsg, param->pExprMsg, param->pTagColumnInfo)) { + SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList}; + if (!validateQueryTableCols(&info, param->pExpr, pQueryMsg->numOfOutput, param->pTagColumnInfo, pQueryMsg)) { code = TSDB_CODE_QRY_INVALID_MSG; goto _cleanup; } @@ -5781,15 +5787,15 @@ static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQuer tExprNode* pExprNode = NULL; TRY(TSDB_MAX_TAG_CONDITIONS) { - pExprNode = exprTreeFromBinary(pArithExprInfo->base.arg[0].argValue.pz, pArithExprInfo->base.arg[0].argBytes); + pExprNode = exprTreeFromBinary(pArithExprInfo->base.param[0].pz, pArithExprInfo->base.param[0].nLen); } CATCH( code ) { CLEANUP_EXECUTE(); - qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz, tstrerror(code)); + qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pArithExprInfo->base.param[0].pz, tstrerror(code)); return code; } END_TRY if (pExprNode == NULL) { - qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz); + qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.param[0].pz); return TSDB_CODE_QRY_APP_ERROR; } @@ -5797,18 +5803,19 @@ static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQuer return TSDB_CODE_SUCCESS; } -static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnInfo* pTagCols, SExprInfo* pExprs, int32_t numOfOutput, int32_t tagLen, bool superTable) { + +static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, int32_t numOfOutput, int32_t tagLen, bool superTable) { for (int32_t i = 0; i < numOfOutput; ++i) { int16_t functId = pExprs[i].base.functionId; if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { - int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); - if (j < 0 || j >= pQueryMsg->numOfCols) { + int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); + if (j < 0 || j >= pTableInfo->numOfCols) { return TSDB_CODE_QRY_INVALID_MSG; } else { - SColumnInfo* pCol = &pQueryMsg->colList[j]; - int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64, - &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, superTable); + SColumnInfo* pCol = &pTableInfo->colList[j]; + int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.param[0].i64, + &pExprs[i].base.resType, &pExprs[i].base.resBytes, &pExprs[i].base.interBytes, tagLen, superTable); assert(ret == TSDB_CODE_SUCCESS); } } @@ -5818,16 +5825,8 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI } // TODO tag length should be passed from client -typedef struct { - int32_t numOfOutput; - int32_t numOfTags; - int32_t numOfCols; - SColumnInfo* colList; - int32_t queryTest; -} SQueriedTableMeta; - -int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, bool isSuperTable) { +int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, + SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -5836,19 +5835,18 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu return TSDB_CODE_QRY_OUT_OF_MEMORY; } -// bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); + bool isSuperTable = QUERY_IS_STABLE_QUERY(queryType); int16_t tagLen = 0; for (int32_t i = 0; i < numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; - pExprs[i].bytes = 0; int16_t type = 0; int16_t bytes = 0; // parse the arithmetic expression if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) { - code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg); + code = buildArithmeticExprFromMsg(&pExprs[i], pMsg); if (code != TSDB_CODE_SUCCESS) { tfree(pExprs); @@ -5865,30 +5863,29 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu SSchema s = tGetBlockDistColumnSchema(); type = s.type; bytes = s.bytes; - } else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) { + } else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.colInfo.colId > TSDB_RES_COL_ID) { // it is a user-defined constant value column assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ); - type = pExprs[i].base.arg[1].argType; - bytes = pExprs[i].base.arg[1].argBytes; - + type = pExprs[i].base.param[1].nType; + bytes = pExprs[i].base.param[1].nLen; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { bytes += VARSTR_HEADER_SIZE; } } else { - int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); + int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) { - if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) { + if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pTableInfo->numOfTags) { return TSDB_CODE_QRY_INVALID_MSG; } } else { - if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) { + if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pTableInfo->numOfCols) { return TSDB_CODE_QRY_INVALID_MSG; } } if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { - SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; + SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pTableInfo->colList[j]; type = pCol->type; bytes = pCol->bytes; } else { @@ -5899,35 +5896,35 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu } } - int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; + int32_t param = (int32_t)pExprs[i].base.param[0].i64; if (pExprs[i].base.functionId != TSDB_FUNC_ARITHM && - (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) { + (type != pExprs[i].base.resType || bytes != pExprs[i].base.resBytes)) { tfree(pExprs); return TSDB_CODE_QRY_INVALID_MSG; } - if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, - &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { + if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes, + &pExprs[i].base.interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { tfree(pExprs); return TSDB_CODE_QRY_INVALID_MSG; } if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) { - tagLen += pExprs[i].bytes; + tagLen += pExprs[i].base.resBytes; } - assert(isValidDataType(pExprs[i].type)); + assert(isValidDataType(pExprs[i].base.resType)); } // the tag length is affected by other tag columns, so this should be update. - updateOutputBufForTopBotQuery(pQueryMsg, pTagCols, pExprs, numOfOutput, tagLen, isSuperTable); + updateOutputBufForTopBotQuery(pTableInfo, pTagCols, pExprs, numOfOutput, tagLen, isSuperTable); *pExprInfo = pExprs; return TSDB_CODE_SUCCESS; } -int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, - SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr) { +int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, + SSqlExpr** pExpr, SExprInfo* prevExpr) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -5939,8 +5936,8 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); for (int32_t i = 0; i < numOfOutput; ++i) { - pExprs[i].base = *pExprMsg[i]; - pExprs[i].bytes = 0; + pExprs[i].base = *pExpr[i]; + pExprs[i].base.resType = 0; int16_t type = 0; int16_t bytes = 0; @@ -5960,18 +5957,18 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu int32_t index = pExprs[i].base.colInfo.colIndex; assert(prevExpr[index].base.resColId == pExprs[i].base.colInfo.colId); - type = prevExpr[index].type; - bytes = prevExpr[index].bytes; + type = prevExpr[index].base.resType; + bytes = prevExpr[index].base.resBytes; } - int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; - if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, - &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { + int32_t param = (int32_t)pExprs[i].base.param[0].i64; + if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes, + &pExprs[i].base.interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { tfree(pExprs); return TSDB_CODE_QRY_INVALID_MSG; } - assert(isValidDataType(pExprs[i].type)); + assert(isValidDataType(pExprs[i].base.resType)); } *pExprInfo = pExprs; @@ -6062,7 +6059,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { assert(pQuery->pExpr1 != NULL && pQuery != NULL); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - SSqlFuncMsg *pSqlExprMsg = &pQuery->pExpr1[k].base; + SSqlExpr *pSqlExprMsg = &pQuery->pExpr1[k].base; if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM) { continue; } @@ -6179,12 +6176,12 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { - assert(pExprs[col].bytes > 0); - pQuery->resultRowSize += pExprs[col].bytes; + assert(pExprs[col].base.resBytes > 0); + pQuery->resultRowSize += pExprs[col].base.resBytes; // keep the tag length if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) { - pQuery->tagLen += pExprs[col].bytes; + pQuery->tagLen += pExprs[col].base.resBytes; } } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 1e7d3c9b58..87051ef1ec 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -34,7 +34,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t size = 0; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - size += pQuery->pExpr1[i].interBytes; + size += pQuery->pExpr1[i].base.interBytes; } assert(size >= 0); @@ -139,7 +139,7 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; - int16_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes; + int16_t size = pRuntimeEnv->pQuery->pExpr1[i].base.resType; char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset); memset(s, 0, size); diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index bba6a10df3..29a6512c40 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -14,7 +14,6 @@ */ #include "os.h" -#include "qFill.h" #include "taosmsg.h" #include "tcache.h" #include "tglobal.h" @@ -23,13 +22,11 @@ #include "hash.h" #include "texpr.h" #include "qExecutor.h" -#include "qResultbuf.h" #include "qUtil.h" #include "query.h" #include "queryLog.h" #include "tlosertree.h" #include "ttype.h" -#include "tcompare.h" typedef struct SQueryMgmt { pthread_mutex_t lock; @@ -58,8 +55,8 @@ void freeParam(SQueryParam *param) { tfree(param->tagCond); tfree(param->tbnameCond); tfree(param->pTableIdList); - tfree(param->pExprMsg); - tfree(param->pSecExprMsg); +// tfree(param->pExprMsg); +// tfree(param->pSecExprMsg); tfree(param->pExprs); tfree(param->pSecExprs); tfree(param->pGroupColIndex); @@ -91,12 +88,14 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) { + SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList}; + if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExpr, param.pTagColumnInfo, + pQueryMsg->queryType, pQueryMsg)) != TSDB_CODE_SUCCESS) { goto _over; } - if (param.pSecExprMsg != NULL) { - if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExprMsg, param.pExprs)) != TSDB_CODE_SUCCESS) { + if (param.pSecExpr != NULL) { + if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExpr, param.pExprs)) != TSDB_CODE_SUCCESS) { goto _over; } } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index ef68499b88..3618d86a05 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -252,7 +252,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; - pRsp->qhandle = 0; + pRsp->qid = 0; pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; @@ -270,7 +270,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { return pRsp->code; } else { assert(*handle == pQInfo); - pRsp->qhandle = htobe64(qId); + pRsp->qid = htobe64(qId); } if (handle != NULL && @@ -343,37 +343,37 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { - void * pCont = pRead->pCont; - SRspRet *pRet = &pRead->rspRet; + void *pCont = pRead->pCont; + SRspRet *pRet = &pRead->rspRet; SRetrieveTableMsg *pRetrieve = pCont; pRetrieve->free = htons(pRetrieve->free); - pRetrieve->qhandle = htobe64(pRetrieve->qhandle); + pRetrieve->qid = htobe64(pRetrieve->qid); - vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle, + vTrace("vgId:%d, qid:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qid, pRetrieve->free, pRead->rpcHandle); memset(pRet, 0, sizeof(SRspRet)); terrno = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS; - void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); + void **handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qid); if (handle == NULL) { code = terrno; terrno = TSDB_CODE_SUCCESS; - } else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) { + } else if (!checkQIdEqual(*handle, pRetrieve->qid)) { code = TSDB_CODE_QRY_INVALID_QHANDLE; } if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle); + vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qid); vnodeBuildNoResultQueryRsp(pRet); return code; } // kill current query and free corresponding resources. if (pRetrieve->free == 1) { - vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle); + vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qid, *handle); qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -384,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle); + vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qid, *handle, pRead->rpcHandle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -402,7 +402,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); freeHandle = true; } else { // result is not ready, return immediately - // Only effects in the non-blocking model + // Only affects the non-blocking model if (!tsRetrieveBlockingModel) { if (!buildRes) { assert(pRead->rpcHandle != NULL); -- GitLab