diff --git a/src/client/inc/tscSQLParser.h b/src/client/inc/tscSQLParser.h index f0bf5b59530b359852c76f5fecc866f6b9d73c8f..0e4ad279dc0beaf0c56109583d3d02f5ae60b4f3 100644 --- a/src/client/inc/tscSQLParser.h +++ b/src/client/inc/tscSQLParser.h @@ -86,15 +86,16 @@ enum _sql_cmd { TSDB_SQL_MAX //48 }; -#define MAX_TOKEN_LEN 30 - -// token type enum { TSQL_NODE_TYPE_EXPR = 0x1, TSQL_NODE_TYPE_ID = 0x2, TSQL_NODE_TYPE_VALUE = 0x4, }; +#define NON_ARITHMEIC_EXPR 0 +#define NORMAL_ARITHMETIC 1 +#define AGG_ARIGHTMEIC 2 + extern char tTokenTypeSwitcher[13]; #define toTSDBType(x) \ diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 9868f703c37f10c1c15df576899d7f2724bdb196..daab5d1b64ef927d0bf81e3b635b55cbc808e09a 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -128,6 +128,8 @@ void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField); void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes); void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible); +void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr); +void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionExpr* pExpr); void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); @@ -149,6 +151,7 @@ SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t f SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, int16_t size); +int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 710bc329e18a67f6068c3722d57d0b1db480b0d1..4893835d56c2094e0faa0052ca1bef18abec4edf 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -31,9 +31,8 @@ extern "C" { #include "tsqlfunction.h" #include "tutil.h" -#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ - (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) - +#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows) + // forward declaration struct SSqlInfo; @@ -70,13 +69,19 @@ typedef struct SSqlExpr { int16_t interResBytes; // inter result buffer size int16_t numOfParams; // argument value of each function tVariant param[3]; // parameters are not more than 3 + int32_t offset; // sub result column value of arithmetic expression. } SSqlExpr; +typedef struct SColumnIndex { + int16_t tableIndex; + int16_t columnIndex; +} SColumnIndex; + typedef struct SFieldInfo { int16_t numOfOutputCols; // number of column in result int16_t numOfAlloc; // allocated size TAOS_FIELD *pFields; - short * pOffset; +// short * pOffset; /* * define if this column is belong to the queried result, it may be add by parser to faciliate @@ -85,7 +90,9 @@ typedef struct SFieldInfo { * NOTE: these hidden columns always locate at the end of the output columns */ bool * pVisibleCols; - int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns + int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns + SSqlFunctionExpr** pExpr; // used for aggregation arithmetic express,such as count(*)+count(*) + SSqlExpr** pSqlExpr; } SFieldInfo; typedef struct SSqlExprInfo { @@ -94,11 +101,6 @@ typedef struct SSqlExprInfo { SSqlExpr *pExprs; } SSqlExprInfo; -typedef struct SColumnIndex { - int16_t tableIndex; - int16_t columnIndex; -} SColumnIndex; - typedef struct SColumnBase { SColumnIndex colIndex; int32_t numOfFilters; @@ -163,7 +165,7 @@ typedef struct STableDataBlocks { int32_t rowSize; // row size for current table uint32_t nAllocSize; - uint32_t headerSize; // header for metadata (submit metadata) + uint32_t headerSize; // header for metadata (submit metadata) uint32_t size; /* @@ -199,8 +201,8 @@ typedef struct SQueryInfo { int64_t etime, stime; int64_t intervalTime; // aggregation time interval - int64_t nSlidingTime; // sliding window in mseconds - SSqlGroupbyExpr groupbyExpr; // group by tags info + int64_t slidingTime; // sliding window in mseconds + SSqlGroupbyExpr groupbyExpr; // group by tags info SColumnBaseInfo colList; SFieldInfo fieldsInfo; @@ -216,9 +218,9 @@ typedef struct SQueryInfo { int64_t * defaultVal; // default value for interpolation char * msg; // pointer to the pCmd->payload to keep error message temporarily int64_t clauseLimit; // limit for current sub clause - + // offset value in the original sql expression, NOT sent to virtual node, only applied at client side - int64_t prjOffset; + int64_t prjOffset; } SQueryInfo; // data source from sql string or from file @@ -269,29 +271,29 @@ typedef struct SResRec { struct STSBuf; typedef struct { - uint8_t code; - int64_t numOfRows; // num of results in current retrieved - int64_t numOfTotal; // num of total results - int64_t numOfTotalInCurrentClause; // num of total result in current subclause - - char * pRsp; - int rspType; - int rspLen; - uint64_t qhandle; - int64_t uid; - int64_t useconds; - int64_t offset; // offset value from vnode during projection query of stable - int row; - int16_t numOfnchar; - int16_t precision; - int32_t numOfGroups; - SResRec * pGroupRec; - char * data; - short * bytes; - void ** tsrow; - char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) + uint8_t code; + int64_t numOfRows; // num of results in current retrieved + int64_t numOfTotal; // num of total results + int64_t numOfTotalInCurrentClause; // num of total result in current subclause + char * pRsp; + int rspType; + int rspLen; + uint64_t qhandle; + int64_t uid; + int64_t useconds; + int64_t offset; // offset value from vnode during projection query of stable + int row; + int16_t numOfnchar; + int16_t precision; + int32_t numOfGroups; + SResRec * pGroupRec; + char * data; + short * bytes; + void ** tsrow; + char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) + SColumnIndex *pColumnIndex; + struct SLocalReducer *pLocalReducer; - SColumnIndex * pColumnIndex; } SSqlRes; typedef struct _tsc_obj { @@ -410,13 +412,13 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); void tscFreeSqlCmdData(SSqlCmd *pCmd); -void tscFreeResData(SSqlObj* pSql); +void tscFreeResData(SSqlObj *pSql); /** * free query result of the sql object * @param pObj */ -void tscFreeSqlResult(SSqlObj* pSql); +void tscFreeSqlResult(SSqlObj *pSql); /** * only free part of resources allocated during query. diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index cf0873b5b620ddc118863e3c2cae86080f1d481f..110399a7e7221f68e66bfa0219ecd2a0617a774b 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -158,7 +158,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, return pNode; } -static uint8_t getBinaryExprOptr(SSQLToken *pToken) { +uint8_t getBinaryExprOptr(SSQLToken *pToken) { switch (pToken->type) { case TK_LT: return TSDB_RELATION_LESS; @@ -183,6 +183,7 @@ static uint8_t getBinaryExprOptr(SSQLToken *pToken) { case TK_STAR: return TSDB_BINARY_OP_MULTIPLY; case TK_SLASH: + case TK_DIVIDE: return TSDB_BINARY_OP_DIVIDE; case TK_REM: return TSDB_BINARY_OP_REMAINDER; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 94ebaefd369975c6873e6dce7ff36c3b513ad91e..f4e0788f5634a5d0dcd8b101901005f5fbf31ddd 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -284,7 +284,7 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { } for (int i = 0; i < pCmd->numOfCols; ++i) - pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row; pRes->row++; (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); @@ -298,7 +298,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); for (int i = 0; i < pCmd->numOfCols; ++i) { - pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row; } pRes->row++; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 851102b8e383d35b7dcc914450d934f433a8f4a9..a0ff10220d2960ea2ce957942c8926c039d921ac 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3297,7 +3297,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, arithmetic_callback_function); - pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; + pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size; pCtx->param[1].pz = NULL; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index be3662ff0e320f85d9f69f8f30c7ae832a1afc6d..b35019f3f3a72950874ed86d2ca6364aaac41f5c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -16,6 +16,7 @@ #define _XOPEN_SOURCE #define _DEFAULT_SOURCE +#include #include "os.h" #include "taos.h" #include "taosmsg.h" @@ -58,9 +59,9 @@ static int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pD static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength); static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); -static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem); +static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn); static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, - int8_t type, char* fieldName); + int8_t type, char* fieldName, SSqlExpr* pSqlExpr); static int32_t changeFunctionID(int32_t optr, int16_t* functionId); static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable); @@ -85,7 +86,7 @@ static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t validateSqlFunctionInStreamSql(SQueryInfo* pQueryInfo); static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString); static int32_t validateFunctionsInIntervalOrGroupbyQuery(SQueryInfo* pQueryInfo); -static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList); +static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList, int32_t* type); static int32_t validateDNodeConfig(tDCLSQL* pOptions); static int32_t validateLocalConfig(tDCLSQL* pOptions); static int32_t validateColumnName(char* name); @@ -93,7 +94,6 @@ static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); -static bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo); static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex); @@ -115,6 +115,9 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); +static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* pAst, int32_t* num, + SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo); + /* * Used during parsing query sql. Since the query sql usually small in length, error position * is not needed in the final error message. @@ -203,7 +206,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); assert(pQueryInfo->numOfTables == 0); - + SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); pCmd->command = pInfo->type; @@ -413,7 +416,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg3 = "name too long"; pCmd->command = pInfo->type; - //tDCLSQL* pDCL = pInfo->pDCLInfo; + // tDCLSQL* pDCL = pInfo->pDCLInfo; SUserInfo* pUser = &pInfo->pDCLInfo->user; SSQLToken* pName = &pUser->user; @@ -501,7 +504,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_SELECT: { assert(pCmd->numOfClause == 1); const char* msg1 = "columns in select clause not identical"; - + for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { SQueryInfo* pqi = NULL; if ((code = tscGetQueryInfoDetailSafely(pCmd, i, &pqi)) != TSDB_CODE_SUCCESS) { @@ -516,18 +519,18 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) { return code; } - + tscPrintSelectClause(pSql, i); } - + // set the command/global limit parameters from the first subclause to the sqlcmd object SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0); pCmd->command = pQueryInfo1->command; - + // if there is only one element, the limit of clause is the limit of global result. - for(int32_t i = 1; i < pCmd->numOfClause; ++i) { + for (int32_t i = 1; i < pCmd->numOfClause; ++i) { SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pCmd, i); - + int32_t ret = tscFieldInfoCompare(&pQueryInfo1->fieldsInfo, &pQueryInfo2->fieldsInfo); if (ret != 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -612,7 +615,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - + return TSDB_CODE_SUCCESS; } @@ -626,7 +629,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } } - + /* * check invalid SQL: * select tbname, tags_fields from super_table_name interval(1s) @@ -652,19 +655,20 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { } SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); SColumnList ids = getColumnList(1, 0, PRIMARYKEY_TIMESTAMP_COL_INDEX); - int32_t ret = insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName); + int32_t ret = + insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName, pExpr); if (ret != TSDB_CODE_SUCCESS) { return ret; } - + if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - + return TSDB_CODE_SUCCESS; } @@ -676,20 +680,20 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { SSQLToken* pSliding = &pQuerySql->sliding; if (pSliding->n != 0) { - getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->nSlidingTime); + getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->slidingTime); if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->nSlidingTime /= 1000; + pQueryInfo->slidingTime /= 1000; } - if (pQueryInfo->nSlidingTime < tsMinSlidingTime) { + if (pQueryInfo->slidingTime < tsMinSlidingTime) { return invalidSqlErrMsg(pQueryInfo->msg, msg0); } - if (pQueryInfo->nSlidingTime > pQueryInfo->intervalTime) { + if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } } else { - pQueryInfo->nSlidingTime = pQueryInfo->intervalTime; + pQueryInfo->slidingTime = pQueryInfo->intervalTime; } return TSDB_CODE_SUCCESS; @@ -703,11 +707,11 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO // backup the old name in pMeterMetaInfo size_t size = strlen(pMeterMetaInfo->name); - char* oldName = NULL; + char* oldName = NULL; if (size > 0) { oldName = strdup(pMeterMetaInfo->name); } - + if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pMeterMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); @@ -726,7 +730,7 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO free(oldName); return code; } - + /* * the old name exists and is not equalled to the new name. Release the metermeta/metricmeta * that are corresponding to the old name for the new table name. @@ -738,7 +742,7 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO } else { assert(pMeterMetaInfo->pMeterMeta == NULL && pMeterMetaInfo->pMetricMeta == NULL); } - + tfree(oldName); return TSDB_CODE_SUCCESS; } @@ -885,7 +889,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) { const char* msg6 = "invalid data type in tags"; assert(pCmd->numOfClause == 1); - + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -1103,11 +1107,11 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel const char* msg3 = "not support query expression"; const char* msg4 = "columns from different table mixed up in arithmetic expression"; const char* msg5 = "invalid function name"; - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); for (int32_t i = 0; i < pSelection->nExpr; ++i) { - int32_t outputIndex = pQueryInfo->fieldsInfo.numOfOutputCols; + int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs; tSQLExprItem* pItem = &pSelection->a[i]; // project on all fields @@ -1119,54 +1123,102 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel // if the name of column is quoted, remove it and set the right information for later process extractColumnNameFromString(pItem); - pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; // select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2 if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_LAST_ROW) { + } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) { // sql function in selection clause, append sql function info in pSqlCmd structure sequentially - if (addExprAndResultField(pQueryInfo, outputIndex, pItem) != TSDB_CODE_SUCCESS) { + if (addExprAndResultField(pQueryInfo, outputIndex, pItem, true) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } } else if (pItem->pNode->nSQLOptr >= TK_PLUS && pItem->pNode->nSQLOptr <= TK_REM) { - // arithmetic function in select + // arithmetic function in select clause SColumnList columnList = {0}; - if (validateArithmeticSQLExpr(pItem->pNode, pQueryInfo, &columnList) != TSDB_CODE_SUCCESS) { + int32_t arithmeticType = NON_ARITHMEIC_EXPR; + + if (validateArithmeticSQLExpr(pItem->pNode, pQueryInfo, &columnList, &arithmeticType) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } int32_t tableIndex = columnList.ids[0].tableIndex; - for(int32_t f = 1; f < columnList.num; ++f) { - if (columnList.ids[f].tableIndex != tableIndex) { - return invalidSqlErrMsg(pQueryInfo->msg, msg4); - } - } - char arithmeticExprStr[1024] = {0}; char* p = arithmeticExprStr; - - if (buildArithmeticExprString(pItem->pNode, &p) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_INVALID_SQL; - } - - // expr string is set as the parameter of function - SColumnIndex index = {.tableIndex = tableIndex}; - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputIndex, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, - sizeof(double), sizeof(double)); - addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), index.tableIndex); - - /* todo alias name should use the original sql string */ - if (pItem->aliasName != NULL) { - strncpy(pExpr->aliasName, pItem->aliasName, TSDB_COL_NAME_LEN); + + if (arithmeticType == NORMAL_ARITHMETIC) { + pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; + + // all columns in arithmetic expression must belong to the same table + for (int32_t f = 1; f < columnList.num; ++f) { + if (columnList.ids[f].tableIndex != tableIndex) { + return invalidSqlErrMsg(pQueryInfo->msg, msg4); + } + } + + if (buildArithmeticExprString(pItem->pNode, &p) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_INVALID_SQL; + } + + // expr string is set as the parameter of function + SColumnIndex index = {.tableIndex = tableIndex}; + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputIndex, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, + sizeof(double), sizeof(double)); + addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), index.tableIndex); + + /* todo alias name should use the original sql string */ + char* name = (pItem->aliasName != NULL)? pItem->aliasName:arithmeticExprStr; + strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN); + + insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); } else { - strncpy(pExpr->aliasName, arithmeticExprStr, TSDB_COL_NAME_LEN); + columnList.num = 0; + columnList.ids[0] = (SColumnIndex) {0, 0}; + + insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, "abc", NULL); + + int32_t slot = tscNumOfFields(pQueryInfo) - 1; + + if (pQueryInfo->fieldsInfo.pExpr[slot] == NULL) { + SSqlFunctionExpr* pFuncExpr = calloc(1, sizeof(SSqlFunctionExpr)); + tscFieldInfoSetBinExpr(&pQueryInfo->fieldsInfo, slot, pFuncExpr); + + // arithmetic expression always return result in the format of double float + pFuncExpr->resBytes = sizeof(double); + pFuncExpr->interResBytes = sizeof(double); + pFuncExpr->resType = TSDB_DATA_TYPE_DOUBLE; + + SSqlBinaryExprInfo* pBinExprInfo = &pFuncExpr->pBinExprInfo; + + tSQLSyntaxNode* pNode = NULL; + SColIndexEx* pColIndex = NULL; + + int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pNode, pItem->pNode, &pBinExprInfo->numOfCols, &pColIndex, &pQueryInfo->exprsInfo); + if (ret != TSDB_CODE_SUCCESS) { + tSQLBinaryExprDestroy(&pNode->pExpr, NULL); + return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause"); + } + + pBinExprInfo->pBinExpr = pNode->pExpr; + pBinExprInfo->pReqColumns = pColIndex; + + for(int32_t k = 0; k < pBinExprInfo->numOfCols; ++k) { + SColIndexEx* pCol = &pBinExprInfo->pReqColumns[k]; + for(int32_t f = 0; f < pQueryInfo->exprsInfo.numOfExprs; ++f) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f); + if (strcmp(pExpr->aliasName, pCol->name) == 0) { + pCol->colIdxInBuf = f; + break; + } + } + + assert(pCol->colIdxInBuf >= 0 && pCol->colIdxInBuf < pQueryInfo->exprsInfo.numOfExprs); + tfree(pNode); + } + } } - - insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName); } else { /* * not support such expression @@ -1188,7 +1240,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_QUERY; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - if (tscQueryMetricTags(pQueryInfo)) { // local handle the metric tag query + if (tscQueryMetricTags(pQueryInfo)) { // local handle the metric tag query pCmd->count = pMeterMetaInfo->pMeterMeta->numOfColumns; // the number of meter schema, tricky. pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS; } @@ -1207,13 +1259,15 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel return TSDB_CODE_SUCCESS; } -int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type, - char* fieldName) { +int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, + int8_t type, char* fieldName, SSqlExpr* pSqlExpr) { for (int32_t i = 0; i < pIdList->num; ++i) { tscColumnBaseInfoInsert(pQueryInfo, &(pIdList->ids[i])); } tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, outputIndex, type, fieldName, bytes); + tscFieldInfoSetExpr(&pQueryInfo->fieldsInfo, outputIndex, pSqlExpr); + return TSDB_CODE_SUCCESS; } @@ -1285,7 +1339,7 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn ids.num = 0; } - insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, colName); + insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, colName, pExpr); } void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, @@ -1298,7 +1352,7 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex ids.num = 0; } - insertResultField(pQueryInfo, outputColIndex, &ids, pColSchema->bytes, pColSchema->type, pColSchema->name); + insertResultField(pQueryInfo, outputColIndex, &ids, pColSchema->bytes, pColSchema->type, pColSchema->name, pExpr); pExpr->colInfo.flag = flag; if (TSDB_COL_IS_TAG(flag)) { @@ -1320,7 +1374,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum } for (int32_t j = 0; j < numOfTotalColumns; ++j) { - doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex); + SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex); pIndex->columnIndex = j; SColumnList ids = {0}; @@ -1329,7 +1383,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum // tag columns do not add to source list ids.num = (j >= pMeterMeta->numOfColumns) ? 0 : 1; - insertResultField(pQueryInfo, startPos + j, &ids, pSchema[j].bytes, pSchema[j].type, pSchema[j].name); + insertResultField(pQueryInfo, startPos + j, &ids, pSchema[j].bytes, pSchema[j].type, pSchema[j].name, pExpr); } return numOfTotalColumns; @@ -1415,8 +1469,8 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, } else { getRevisedName(columnName, functionID, TSDB_COL_NAME_LEN, pSchema[pColIndex->columnIndex].name); } - - tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); + + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); // for point interpolation/last_row query, we need the timestamp column to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; @@ -1425,12 +1479,12 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, } SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); - insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName); + insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr); return TSDB_CODE_SUCCESS; } -int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem) { +int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool finalResult) { SMeterMetaInfo* pMeterMetaInfo = NULL; int32_t optr = pItem->pNode->nSQLOptr; @@ -1453,6 +1507,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt return TSDB_CODE_INVALID_SQL; } + SSqlExpr* pExpr = NULL; SColumnIndex index = COLUMN_INDEX_INITIALIZER; if (pItem->pNode->pParam != NULL) { @@ -1473,7 +1528,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - tscSqlExprInsert(pQueryInfo, colIdx, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size); + pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size); } else { // count the number of meters created according to the metric if (getColumnIndexByName(pToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { @@ -1488,22 +1543,28 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - tscSqlExprInsert(pQueryInfo, colIdx, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size); + pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size); } } else { // count(*) is equalled to count(primary_timestamp_key) index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - tscSqlExprInsert(pQueryInfo, colIdx, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size); + pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size); } - - char columnName[TSDB_COL_NAME_LEN] = {0}; - getColumnName(pItem, columnName, TSDB_COL_NAME_LEN); - - // count always use the primary timestamp key column, which is 0. + + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); + getColumnName(pItem, pExpr->aliasName, TSDB_COL_NAME_LEN); + SColumnList ids = getColumnList(1, index.tableIndex, index.columnIndex); - - insertResultField(pQueryInfo, colIdx, &ids, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, columnName); + if (finalResult) { + int32_t numOfOutput = tscNumOfFields(pQueryInfo); + insertResultField(pQueryInfo, numOfOutput, &ids, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->aliasName, pExpr); + } else { + for (int32_t i = 0; i < ids.num; ++i) { + tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); + } + } + return TSDB_CODE_SUCCESS; } case TK_SUM: @@ -1541,9 +1602,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt return invalidSqlErrMsg(pQueryInfo->msg, msg1); } - char columnName[TSDB_COL_NAME_LEN] = {0}; - getColumnName(pItem, columnName, TSDB_COL_NAME_LEN); - int16_t resultType = 0; int16_t resultSize = 0; int16_t intermediateResSize = 0; @@ -1562,11 +1620,11 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt if (optr == TK_DIFF) { colIdx += 1; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; - tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); SColumnList ids = getColumnList(1, 0, 0); - insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName); + insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr); } // functions can not be applied to tags @@ -1596,8 +1654,18 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt SColumnList ids = {0}; ids.num = 1; ids.ids[0] = index; - - insertResultField(pQueryInfo, colIdx, &ids, pExpr->resBytes, pExpr->resType, columnName); + + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); + getColumnName(pItem, pExpr->aliasName, TSDB_COL_NAME_LEN); + + if (finalResult) { + int32_t numOfOutput = tscNumOfFields(pQueryInfo); + insertResultField(pQueryInfo, numOfOutput, &ids, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr); + } else { + for (int32_t i = 0; i < ids.num; ++i) { + tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); + } + } return TSDB_CODE_SUCCESS; } @@ -1699,10 +1767,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt if (pParamElem->pNode->nSQLOptr != TK_ID) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - - char columnName[TSDB_COL_NAME_LEN] = {0}; - getColumnName(pItem, columnName, TSDB_COL_NAME_LEN); - + SColumnIndex index = COLUMN_INDEX_INITIALIZER; if (getColumnIndexByName(&pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); @@ -1733,7 +1798,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt int16_t resultSize = pSchema[index.columnIndex].bytes; char val[8] = {0}; - int32_t numOfAddedColumn = 1; + SSqlExpr* pExpr = NULL; + if (optr == TK_PERCENTILE || optr == TK_APERCENTILE) { tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE); @@ -1755,7 +1821,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt return TSDB_CODE_INVALID_SQL; } - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionId, &index, resultType, resultSize, resultSize); + pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionId, &index, resultType, resultSize, resultSize); addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0); } else { tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT); @@ -1772,22 +1838,30 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt // set the first column ts for top/bottom query SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); + pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); const int32_t TS_COLUMN_INDEX = 0; SColumnList ids = getColumnList(1, 0, TS_COLUMN_INDEX); insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, - aAggs[TSDB_FUNC_TS].aName); + aAggs[TSDB_FUNC_TS].aName, pExpr); colIdx += 1; // the first column is ts - numOfAddedColumn += 1; - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionId, &index, resultType, resultSize, resultSize); + pExpr = tscSqlExprInsert(pQueryInfo, colIdx, functionId, &index, resultType, resultSize, resultSize); addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0); } - + + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); + getColumnName(pItem, pExpr->aliasName, TSDB_COL_NAME_LEN); + SColumnList ids = getColumnList(1, 0, index.columnIndex); - insertResultField(pQueryInfo, colIdx, &ids, resultSize, resultType, columnName); + if (finalResult) { + insertResultField(pQueryInfo, colIdx, &ids, resultSize, resultType, pExpr->aliasName, pExpr); + } else { + for (int32_t i = 0; i < ids.num; ++i) { + tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); + } + } return TSDB_CODE_SUCCESS; } @@ -2016,7 +2090,7 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); assert(pCmd->numOfClause == 1); - + pCmd->command = TSDB_SQL_SHOW; const char* msg1 = "invalid name"; @@ -2146,15 +2220,18 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { int16_t type = 0; int16_t intermediateBytes = 0; - for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); - TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, k); +// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, k); int16_t functionId = aAggs[pExpr->functionId].stableFuncId; + int32_t colIndex = pExpr->colInfo.colIdx; + SSchema* pSrcSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, colIndex); + if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST)) { - if (getResultDataInfo(pField->type, pField->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes, + if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes, &intermediateBytes, 0, true) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -2176,14 +2253,17 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) { return; } - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i); - + SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx); + if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) { - pExpr->resBytes = pField->bytes; - pExpr->resType = pField->type; + // the final result size and type in the same as query on single table. + // so here, set the flag to be false; + int16_t inter = 0; + getResultDataInfo(pSchema->type, pSchema->bytes, pExpr->functionId, 0, &pExpr->resType, &pExpr->resBytes, + &inter, 0, false); } } } @@ -2192,7 +2272,7 @@ bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo) { const char* msg1 = "TWA not allowed to apply to super table directly"; const char* msg2 = "TWA only support group by tbname for super table query"; const char* msg3 = "function not support for super table query"; - + // filter sql function not supported by metric query yet. for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; @@ -2533,7 +2613,13 @@ static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) { } else if (pExpr->nSQLOptr >= TK_BOOL && pExpr->nSQLOptr <= TK_STRING) { // value *str += tVariantToString(&pExpr->val, *str); - } else { + } else if (pExpr->nSQLOptr >= TK_COUNT && pExpr->nSQLOptr <= TK_AVG_IRATE) { + /* + * arithmetic expression of aggregation, such as count(ts) + count(ts) *2 + */ + strncpy(*str, pExpr->operand.z, pExpr->operand.n); + *str += pExpr->operand.n; + } else { // not supported operation assert(false); } @@ -2869,8 +2955,7 @@ int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString) { tSQLExpr* pLeft = pExpr->pLeft; tSQLExpr* pRight = pExpr->pRight; - *(*exprString) = '('; - *exprString += 1; + *(*exprString)++ = '('; if (pLeft->nSQLOptr >= TK_PLUS && pLeft->nSQLOptr <= TK_REM) { buildArithmeticExprString(pLeft, exprString); @@ -2892,50 +2977,67 @@ int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString) { } } - *(*exprString) = ')'; - *exprString += 1; + *(*exprString)++ = ')'; return TSDB_CODE_SUCCESS; } -static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList) { +static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList, int32_t* type) { if (pExpr->nSQLOptr == TK_ID) { - SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pExpr->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_INVALID_SQL; - } + if (*type == NON_ARITHMEIC_EXPR) { + *type = NORMAL_ARITHMETIC; + } else if (*type == AGG_ARIGHTMEIC) { + return TSDB_CODE_INVALID_SQL; + } - // if column is timestamp, bool, binary, nchar, not support arithmetic, so return invalid sql - SMeterMeta* pMeterMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, index.tableIndex)->pMeterMeta; - SSchema* pSchema = tsGetSchema(pMeterMeta) + index.columnIndex; - if ((pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) || (pSchema->type == TSDB_DATA_TYPE_BOOL) - || (pSchema->type == TSDB_DATA_TYPE_BINARY) || (pSchema->type == TSDB_DATA_TYPE_NCHAR)){ - return TSDB_CODE_INVALID_SQL; - } - - pList->ids[pList->num++] = index; + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(&pExpr->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_INVALID_SQL; + } + + // if column is timestamp, bool, binary, nchar, not support arithmetic, so return invalid sql + SMeterMeta* pMeterMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, index.tableIndex)->pMeterMeta; + SSchema* pSchema = tsGetSchema(pMeterMeta) + index.columnIndex; + if ((pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) || (pSchema->type == TSDB_DATA_TYPE_BOOL) || + (pSchema->type == TSDB_DATA_TYPE_BINARY) || (pSchema->type == TSDB_DATA_TYPE_NCHAR)) { + return TSDB_CODE_INVALID_SQL; + } + + pList->ids[pList->num++] = index; } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { return TSDB_CODE_INVALID_SQL; - } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { - return TSDB_CODE_INVALID_SQL; + } else if (pExpr->nSQLOptr >= TK_COUNT && pExpr->nSQLOptr <= TK_AVG_IRATE) { + if (*type == NON_ARITHMEIC_EXPR) { + *type = AGG_ARIGHTMEIC; + } else if (*type == NORMAL_ARITHMETIC) { + return TSDB_CODE_INVALID_SQL; + } + + int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs; + tSQLExprItem item = {.pNode = pExpr, .aliasName = NULL}; + + // sql function in selection clause, append sql function info in pSqlCmd structure sequentially + if (addExprAndResultField(pQueryInfo, outputIndex, &item, false) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_INVALID_SQL; + } } return TSDB_CODE_SUCCESS; } -static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList) { +static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList, int32_t* type) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; } tSQLExpr* pLeft = pExpr->pLeft; if (pLeft->nSQLOptr >= TK_PLUS && pLeft->nSQLOptr <= TK_REM) { - int32_t ret = validateArithmeticSQLExpr(pLeft, pQueryInfo, pList); + int32_t ret = validateArithmeticSQLExpr(pLeft, pQueryInfo, pList, type); if (ret != TSDB_CODE_SUCCESS) { return ret; } } else { - int32_t ret = validateSQLExpr(pLeft, pQueryInfo, pList); + int32_t ret = validateSQLExpr(pLeft, pQueryInfo, pList, type); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -2943,12 +3045,12 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo tSQLExpr* pRight = pExpr->pRight; if (pRight->nSQLOptr >= TK_PLUS && pRight->nSQLOptr <= TK_REM) { - int32_t ret = validateArithmeticSQLExpr(pRight, pQueryInfo, pList); + int32_t ret = validateArithmeticSQLExpr(pRight, pQueryInfo, pList, type); if (ret != TSDB_CODE_SUCCESS) { return ret; } } else { - int32_t ret = validateSQLExpr(pRight, pQueryInfo, pList); + int32_t ret = validateSQLExpr(pRight, pQueryInfo, pList, type); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -3841,12 +3943,12 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) { TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i); - + if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes); continue; } - + int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type); if (ret != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg); @@ -4450,7 +4552,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* const char* msg1 = "slimit/soffset only available for STable query"; const char* msg2 = "function not supported on table"; const char* msg3 = "slimit/soffset can not apply to projection query"; - + // handle the limit offset value, validate the limit pQueryInfo->limit = pQuerySql->limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit; @@ -4480,18 +4582,19 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } - + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// if (pQueryInfo->order.orderColId >= 0) { -// if (pQueryInfo->limit.limit == -1) { -// return invalidSqlErrMsg(pQueryInfo->msg, msg4); -// } else if (pQueryInfo->limit.limit > 10000) { // the result set can not be larger than 10000 -// //todo use global config parameter -// return invalidSqlErrMsg(pQueryInfo->msg, msg5); -// } -// } - - pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries + // if (pQueryInfo->order.orderColId >= 0) { + // if (pQueryInfo->limit.limit == -1) { + // return invalidSqlErrMsg(pQueryInfo->msg, msg4); + // } else if (pQueryInfo->limit.limit > 10000) { // the result set can not be larger than 10000 + // //todo use global config parameter + // return invalidSqlErrMsg(pQueryInfo->msg, msg5); + // } + // } + + pQueryInfo->type |= + TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries } } } @@ -4523,7 +4626,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* // keep original limitation value in globalLimit pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->prjOffset = pQueryInfo->limit.offset; - + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { /* * the limitation/offset value should be removed during retrieve data from virtual node, @@ -4533,7 +4636,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* if (pQueryInfo->limit.limit > 0) { pQueryInfo->limit.limit = -1; } - + pQueryInfo->limit.offset = 0; } } else { @@ -4657,7 +4760,7 @@ void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t t // NOTE: tag column does not add to source column list SColumnList ids = getColumnList(1, tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX); - insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts"); + insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr); } } @@ -4686,7 +4789,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau // NOTE: tag column does not add to source column list SColumnList ids = {0}; - insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name); + insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name, pExpr); int32_t relIndex = index.columnIndex; @@ -4701,7 +4804,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau // limit the output to be 1 for each state value static void doLimitOutputNormalColOfGroupby(SSqlExpr* pExpr) { int32_t outputRow = 1; - tVariantCreateFromBinary(&pExpr->param[0], (char*) &outputRow, sizeof(int32_t), TSDB_DATA_TYPE_INT); + tVariantCreateFromBinary(&pExpr->param[0], (char*)&outputRow, sizeof(int32_t), TSDB_DATA_TYPE_INT); pExpr->numOfParams = 1; } @@ -4718,14 +4821,14 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { pExpr->colInfo.flag = TSDB_COL_NORMAL; doLimitOutputNormalColOfGroupby(pExpr); - + // NOTE: tag column does not add to source column list SColumnList list = {0}; list.num = 1; list.ids[0] = colIndex; insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &list, pSchema->bytes, pSchema->type, - pSchema->name); + pSchema->name, pExpr); tscFieldInfoUpdateVisible(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutputCols - 1, false); } @@ -4952,7 +5055,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { // NOTE: tag column does not add to source column list SColumnList ids = {0}; - insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name); + insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name, pExpr); } else { // if this query is "group by" normal column, interval is not allowed if (pQueryInfo->intervalTime > 0) { @@ -5200,8 +5303,8 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) { } int32_t totalBufSize = 1024; - - char str[1024] = {0}; + + char str[1024] = {0}; int32_t offset = 0; offset += sprintf(str, "num:%d [", pQueryInfo->exprsInfo.numOfExprs); @@ -5209,12 +5312,13 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); char tmpBuf[1024] = {0}; - int32_t tmpLen = 0; - tmpLen = sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName, pExpr->uid, pExpr->colInfo.colId); + int32_t tmpLen = 0; + tmpLen = + sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName, pExpr->uid, pExpr->colInfo.colId); if (tmpLen + offset > totalBufSize) break; offset += sprintf(str + offset, "%s", tmpBuf); - + if (i < pQueryInfo->exprsInfo.numOfExprs - 1) { str[offset++] = ','; } @@ -5473,7 +5577,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { if (pMeterMetaInfo == NULL) { pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); } - + // too many result columns not support order by in query if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8); @@ -5497,7 +5601,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } pQueryInfo->command = TSDB_SQL_SELECT; - + // set all query tables, which are maybe more than one. for (int32_t i = 0; i < pQuerySql->from->nExpr; ++i) { tVariant* pTableItem = &pQuerySql->from->a[i].pVar; @@ -5613,7 +5717,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { updateTagColumnIndex(pQueryInfo, i); } - + /* * fill options are set at the end position, when all columns are set properly * the columns may be increased due to group by operation @@ -5622,7 +5726,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { if (pQueryInfo->intervalTime == 0 && (!tscIsPointInterpQuery(pQueryInfo))) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - + if (pQueryInfo->intervalTime > 0) { int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime); // number of result is not greater than 10,000,000 @@ -5630,7 +5734,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(pQueryInfo->msg, msg6); } } - + int32_t ret = parseFillClause(pQueryInfo, pQuerySql); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -5640,7 +5744,80 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return TSDB_CODE_SUCCESS; // Does not build query message here } -bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo) { - return (pQueryInfo->stime == 0 && pQueryInfo->etime == INT64_MAX) || - (pQueryInfo->stime == INT64_MAX && pQueryInfo->etime == 0); +static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* pAst, int32_t* num, + SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo) { + tSQLSyntaxNode* pLeft = NULL; + tSQLSyntaxNode* pRight= NULL; + + if (pAst->pLeft != NULL) { + int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pLeft, pAst->pLeft, num, pColIndex, pExprInfo); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + } + + if (pAst->pRight != NULL) { + int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pRight, pAst->pRight, num, pColIndex, pExprInfo); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + } + + if (pAst->pLeft == NULL) { + if (pAst->nSQLOptr >= TK_TINYINT && pAst->nSQLOptr <= TK_DOUBLE) { + *pExpr = calloc(1, sizeof(tSQLSyntaxNode) + sizeof(tVariant)); + (*pExpr)->nodeType = TSQL_NODE_VALUE; + (*pExpr)->pVal = (tVariant*) ((char*)(*pExpr) + sizeof(tSQLSyntaxNode)); + tVariantAssign((*pExpr)->pVal, &pAst->val); + } else if (pAst->nSQLOptr >= TK_COUNT && pAst->nSQLOptr <= TK_AVG_IRATE) { + *pExpr = calloc(1, sizeof(tSQLSyntaxNode) + sizeof(SSchemaEx)); + (*pExpr)->nodeType = TSQL_NODE_COL; + (*pExpr)->pSchema = (SSchema*)((char*)(*pExpr) + sizeof(tSQLSyntaxNode)); + strncpy((*pExpr)->pSchema->name, pAst->operand.z, pAst->operand.n); + + // set the input column data byte and type. + for (int32_t i = 0; i < pExprInfo->numOfExprs; ++i) { + if (strcmp((*pExpr)->pSchema->name, pExprInfo->pExprs[i].aliasName) == 0) { + (*pExpr)->pSchema->type = pExprInfo->pExprs[i].resType; + (*pExpr)->pSchema->bytes = pExprInfo->pExprs[i].resBytes; + break; + } + } + } else { //todo return error + return TSDB_CODE_SUCCESS; + } + + (*pExpr)->colId = -1; + + *pColIndex = realloc(*pColIndex, (++(*num)) * sizeof(SColIndexEx)); + memset(&(*pColIndex)[(*num) - 1], 0, sizeof(SColIndexEx)); + + strncpy((*pColIndex)[(*num) - 1].name, pAst->operand.z, pAst->operand.n); + } else { + tSQLBinaryExpr *pBinExpr = (tSQLBinaryExpr *)calloc(1, sizeof(tSQLBinaryExpr)); + pBinExpr->filterOnPrimaryKey = false; + pBinExpr->pLeft = pLeft; + pBinExpr->pRight = pRight; + SSQLToken t = {.type = pAst->nSQLOptr}; + pBinExpr->nSQLBinaryOptr = getBinaryExprOptr(&t); + + assert(pBinExpr->nSQLBinaryOptr != 0); + + (*pExpr) = malloc(sizeof(tSQLSyntaxNode)); + (*pExpr)->nodeType = TSQL_NODE_EXPR; + (*pExpr)->pExpr = pBinExpr; + (*pExpr)->colId = -1; + + if (pBinExpr->nSQLBinaryOptr == TSDB_BINARY_OP_DIVIDE) { + if (pRight->nodeType == TSQL_NODE_VALUE) { + if (pRight->pVal->nType == TSDB_DATA_TYPE_INT && pRight->pVal->i64Key == 0) { + return TSDB_CODE_INVALID_SQL; + } else if (pRight->pVal->nType == TSDB_DATA_TYPE_FLOAT && pRight->pVal->dKey == 0) { + return TSDB_CODE_INVALID_SQL; + } + } + } + } + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 6b2c25ece0f56f835440365d38b9c2a109b959f3..a156522cdfcda33f7e6e941a71b27a6ba6e38cf6 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -19,6 +19,7 @@ #include "tscUtil.h" #include "tsclient.h" #include "tutil.h" +#include "tschemautil.h" typedef struct SCompareParam { SLocalDataSource **pLocalData; @@ -59,12 +60,13 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object. */ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; - + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + pCtx->aOutputBuf = pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity; pCtx->order = pQueryInfo->order.order; - pCtx->functionId = pQueryInfo->exprsInfo.pExprs[i].functionId; + pCtx->functionId = pExpr->functionId; // input buffer hold only one point data int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); @@ -76,19 +78,16 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu pCtx->inputType = pSchema->type; pCtx->inputBytes = pSchema->bytes; - TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); // output data format yet comes from pCmd. - pCtx->outputBytes = pField->bytes; - pCtx->outputType = pField->type; + pCtx->outputBytes = pExpr->resBytes; + pCtx->outputType = pExpr->resType; pCtx->startOffset = 0; pCtx->size = 1; pCtx->hasNull = true; pCtx->currentStage = SECONDARY_STAGE_MERGE; - pRes->bytes[i] = pField->bytes; - - SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); + pRes->bytes[i] = pExpr->resBytes; // for top/bottom function, the output of timestamp is the first column int32_t functionId = pExpr->functionId; @@ -255,7 +254,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd // the input data format follows the old format, but output in a new format. // so, all the input must be parsed as old format - pReducer->pCtx = (SQLFunctionCtx *)calloc(pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(SQLFunctionCtx)); + pReducer->pCtx = (SQLFunctionCtx *)calloc(pQueryInfo->exprsInfo.numOfExprs, sizeof(SQLFunctionCtx)); pReducer->rowSize = pMemBuffer[0]->nElemSize; @@ -302,7 +301,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd } pReducer->pTempBuffer->numOfElems = 0; - pReducer->pResInfo = calloc((size_t)pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(SResultInfo)); + pReducer->pResInfo = calloc((size_t)pQueryInfo->exprsInfo.numOfExprs, sizeof(SResultInfo)); tscCreateResPointerInfo(pRes, pQueryInfo); tscInitSqlContext(pCmd, pRes, pReducer, pDesc); @@ -613,7 +612,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr return pRes->code; } - pSchema = (SSchema *)calloc(1, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutputCols); + pSchema = (SSchema *)calloc(1, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs); if (pSchema == NULL) { tscError("%p failed to allocate memory", pSql); pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -621,7 +620,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr } int32_t rlen = 0; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); pSchema[i].bytes = pExpr->resBytes; @@ -635,7 +634,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr capacity = nBufferSizes / rlen; } - pModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutputCols, capacity); + pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); @@ -647,16 +646,33 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr return pRes->code; } - memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutputCols); - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { - TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); + // final result depends on the fields number + memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs); + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + + SSchema* p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx); + + int16_t inter = 0; + int16_t type = -1; + int16_t bytes = 0; + + if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || + (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) { + // the final result size and type in the same as query on single table. + // so here, set the flag to be false; + getResultDataInfo(p1->type, p1->bytes, pExpr->functionId, 0, &type, &bytes, &inter, 0, false); + } else { + type = pModel->pFields[i].field.type; + bytes = pModel->pFields[i].field.bytes; + } - pSchema[i].type = pField->type; - pSchema[i].bytes = pField->bytes; - strcpy(pSchema[i].name, pField->name); + pSchema[i].type = type; + pSchema[i].bytes = bytes; + strcpy(pSchema[i].name, pModel->pFields[i].field.name); } - *pFinalModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutputCols, capacity); + *pFinalModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); tfree(pSchema); return TSDB_CODE_SUCCESS; @@ -862,12 +878,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo } int32_t rowSize = tscGetResRowLength(pQueryInfo); - // handle the descend order output -// if (pQueryInfo->order.order == TSQL_SO_ASC) { - memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize); -// } else { -// reversedCopyResultToDstBuf(pQueryInfo, pRes, pFinalDataPage); -// } + memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize); pFinalDataPage->numOfElems = 0; return; @@ -999,15 +1010,15 @@ static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, // the tag columns need to be set before all functions execution SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - for(int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) { + for(int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j); SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j]; tVariantAssign(&pCtx->param[0], &pExpr->param[0]); // tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer - if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TAG || - pExpr->functionId == TSDB_FUNC_TS_DUMMY) { + int32_t functionId = pExpr->functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) { tVariantDestroy(&pCtx->tag); tVariantCreateFromBinary(&pCtx->tag, pCtx->aInputElemBuf, pCtx->inputBytes, pCtx->inputType); } @@ -1019,7 +1030,7 @@ static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, } } - for (int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; @@ -1041,20 +1052,29 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) int64_t maxOutput = 0; for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { - int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId; +// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j]; +// if (pExpr == NULL) { +// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL); +// +// maxOutput = 1; +// continue; +// } /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output */ + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, j); + int32_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) { continue; } - + if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) { maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes; } } + return maxOutput; } @@ -1066,7 +1086,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) */ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) { int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer - for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); if (maxBufSize < pExpr->resBytes && pExpr->functionId == TSDB_FUNC_TAG) { maxBufSize = pExpr->resBytes; @@ -1076,7 +1096,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo assert(maxBufSize >= 0); char *buf = malloc((size_t) maxBufSize); - for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); if (pExpr->functionId != TSDB_FUNC_TAG) { continue; @@ -1098,12 +1118,9 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo } int32_t finalizeRes(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { - for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]); - - // allow to re-initialize for the next round - pLocalReducer->pCtx[k].resultInfo->initialized = false; } pLocalReducer->hasPrevRow = false; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1b26268978b0c43f5fbd78c4cbfb3e29729c5907..121ff0c0e988d685acd6f1f19e02caca22c01a5b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -901,7 +901,7 @@ int tscLaunchSTableSubqueries(SSqlObj *pSql) { tExtMemBuffer ** pMemoryBuf = NULL; tOrderDescriptor *pDesc = NULL; - SColumnModel * pModel = NULL; + SColumnModel * pModel = NULL; pRes->qhandle = 1; // hack the qhandle check @@ -1685,7 +1685,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; - pQueryMsg->slidingTime = htobe64(pQueryInfo->nSlidingTime); + pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); if (pQueryInfo->intervalTime < 0) { tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); @@ -1768,7 +1768,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_ARITHM) { @@ -1787,7 +1787,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); - pMsg += sizeof(SSqlFuncExprMsg); + pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 8a5bacd0bfa96d900ec4112e1ba68c8c47dec96c..e9e74413c66cea697f7bc5d610c172b2d62280f0 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -13,8 +13,9 @@ * along with this program. If not, see . */ -#include "os.h" +#include #include "hash.h" +#include "os.h" #include "tcache.h" #include "tlog.h" #include "tnote.h" @@ -200,7 +201,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { taosCleanUpHashTable(pSql->pTableHashList); pSql->pTableHashList = NULL; } - + tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr); pRes->code = (uint8_t)tsParseSql(pSql, false); @@ -367,9 +368,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { -// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + -// pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1); - pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order); + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i); } *rows = pRes->tsrow; @@ -377,22 +376,73 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { return (pQueryInfo->order.order == TSQL_SO_DESC) ? pRes->numOfRows : -pRes->numOfRows; } +static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) { + SSqlRes *pRes = &pSql->res; + + if (isNull(pRes->tsrow[columnIndex], pField->type)) { + pRes->tsrow[columnIndex] = NULL; + } else if (pField->type == TSDB_DATA_TYPE_NCHAR) { + // convert unicode to native code in a temporary buffer extra one byte for terminated symbol + if (pRes->buffer[columnIndex] == NULL) { + pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE); + } + + /* string terminated char for binary data*/ + memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE); + + if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) { + pRes->tsrow[columnIndex] = pRes->buffer[columnIndex]; + } else { + tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow); + pRes->tsrow[columnIndex] = NULL; + } + } +} + +static char *getArithemicInputSrc(void *param, char *name, int32_t colId) { + SArithmeticSupport *pSupport = (SArithmeticSupport *)param; + SSqlFunctionExpr * pExpr = pSupport->pExpr; + + int32_t index = -1; + for (int32_t i = 0; i < pExpr->pBinExprInfo.numOfCols; ++i) { + if (strcmp(name, pExpr->pBinExprInfo.pReqColumns[i].name) == 0) { + index = i; + break; + } + } + + assert(index >= 0 && index < pExpr->pBinExprInfo.numOfCols); + return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index]; +} + static void **doSetResultRowData(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); - + if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker tfree(pRes->tsrow); return pRes->tsrow; } - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + //todo refactor move away + for(int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); + + if (k > 0) { + SSqlExpr* pPrev = tscSqlExprGet(pQueryInfo, k - 1); + pExpr->offset = pPrev->offset + pPrev->resBytes; + } + } + int32_t num = 0; - for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { - pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; + for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { + if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) { + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row; + } // primary key column cannot be null in interval query, no need to check if (i == 0 && pQueryInfo->intervalTime > 0) { @@ -400,31 +450,38 @@ static void **doSetResultRowData(SSqlObj *pSql) { } TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); - if (isNull(pRes->tsrow[i], pField->type)) { - pRes->tsrow[i] = NULL; - } else if (pField->type == TSDB_DATA_TYPE_NCHAR) { - // convert unicode to native code in a temporary buffer extra one byte for terminated symbol - if (pRes->buffer[num] == NULL) { - pRes->buffer[num] = malloc(pField->bytes + TSDB_NCHAR_SIZE); - } + transferNcharData(pSql, i, pField); - /* string terminated char for binary data*/ - memset(pRes->buffer[num], 0, pField->bytes + TSDB_NCHAR_SIZE); - - if (taosUcs4ToMbs(pRes->tsrow[i], pField->bytes, pRes->buffer[num])) { - pRes->tsrow[i] = pRes->buffer[num]; - } else { - tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow); - pRes->tsrow[i] = NULL; + // calculate the result from serveral other columns + if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) { + SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); + sas->offset = 0; + sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i]; + + sas->numOfCols = sas->pExpr->pBinExprInfo.numOfCols; + + if (pRes->buffer[i] == NULL) { + pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes); } - num++; + for(int32_t k = 0; k < sas->numOfCols; ++k) { + int32_t columnIndex = sas->pExpr->pBinExprInfo.pReqColumns[k].colIdxInBuf; + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex); + + sas->elemSize[k] = pExpr->resBytes; + sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; + } + + tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc); + pRes->tsrow[i] = pRes->buffer[i]; + + free(sas); //todo optimization } } assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols); - - pRes->row++; // index increase one-step + + pRes->row++; // index increase one-step return pRes->tsrow; } @@ -466,7 +523,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { if (pSql->pSubs[i] == 0) { continue; } - + SSqlRes * pRes1 = &pSql->pSubs[i]->res; SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); @@ -502,11 +559,10 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && - (doSetResultRowData(pSql->pSubs[1]) != NULL); - // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); + success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); + // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; + // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; + // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); } else { // only one subquery SSqlObj *pSub = pSql->pSubs[0]; if (pSub == NULL) { @@ -596,7 +652,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { tscProcessSql(pSql); // retrieve data from virtual node - //if failed to retrieve data from current virtual node, try next one if exists + // if failed to retrieve data from current virtual node, try next one if exists if (hasMoreVnodesToTry(pSql)) { tscTryQueryNextVnode(pSql, NULL); } @@ -638,7 +694,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // current subclause is completed, try the next subclause while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { tscTryQueryNextClause(pSql, NULL); - + // if the rows is not NULL, return immediately rows = taos_fetch_row_impl(res); } @@ -701,7 +757,7 @@ int taos_select_db(TAOS *taos, const char *db) { return taos_query(taos, sql); } -void taos_free_result_imp(TAOS_RES* res, int keepCmd) { +void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if (res == NULL) return; SSqlObj *pSql = (SSqlObj *)res; @@ -755,7 +811,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command); - + void *fp = pSql->fp; if (fp != NULL) { pSql->freed = 1; @@ -804,9 +860,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { } } -void taos_free_result(TAOS_RES *res) { - taos_free_result_imp(res, 0); -} +void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } int taos_errno(TAOS *taos) { STscObj *pObj = (STscObj *)taos; @@ -822,26 +876,24 @@ int taos_errno(TAOS *taos) { return code; } -static bool validErrorCode(int32_t code) { - return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; -} +static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; } /* * In case of invalid sql error, additional information is attached to explain * why the sql is invalid */ -static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd* pCmd) { +static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { if (code != TSDB_CODE_INVALID_SQL) { return false; } size_t len = strlen(pCmd->payload); - - char* z = NULL; + + char *z = NULL; if (len > 0) { - z = strstr (pCmd->payload, "invalid SQL"); + z = strstr(pCmd->payload, "invalid SQL"); } - + return z != NULL; } @@ -851,12 +903,12 @@ char *taos_errstr(TAOS *taos) { if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode]; - SSqlObj* pSql = pObj->pSql; - + SSqlObj *pSql = pObj->pSql; + if (validErrorCode(pSql->res.code)) { code = pSql->res.code; } else { - code = TSDB_CODE_OTHERS; //unknown error + code = TSDB_CODE_OTHERS; // unknown error } if (hasAdditionalErrorInfo(code, &pSql->cmd)) { @@ -954,14 +1006,14 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: { - size_t xlen = 0; - for (xlen = 0; xlen <= fields[i].bytes; xlen++) { - char c = ((char*)row[i])[xlen]; - if (c == 0) break; - str[len++] = c; - } - str[len] = 0; - } break; + size_t xlen = 0; + for (xlen = 0; xlen <= fields[i].bytes; xlen++) { + char c = ((char *)row[i])[xlen]; + if (c == 0) break; + str[len++] = c; + } + str[len] = 0; + } break; case TSDB_DATA_TYPE_TIMESTAMP: len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 88620c5b12fb314cafe24ce8c63a33f05f77c397..bff58bd61af585fe58a100b37baf234f86df827a 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -389,33 +389,33 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { pStream->interval = pQueryInfo->intervalTime; // it shall be derived from sql string - if (pQueryInfo->nSlidingTime == 0) { - pQueryInfo->nSlidingTime = pQueryInfo->intervalTime; + if (pQueryInfo->slidingTime == 0) { + pQueryInfo->slidingTime = pQueryInfo->intervalTime; } int64_t minSlidingTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; - if (pQueryInfo->nSlidingTime == -1) { - pQueryInfo->nSlidingTime = pQueryInfo->intervalTime; - } else if (pQueryInfo->nSlidingTime < minSlidingTime) { + if (pQueryInfo->slidingTime == -1) { + pQueryInfo->slidingTime = pQueryInfo->intervalTime; + } else if (pQueryInfo->slidingTime < minSlidingTime) { tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream, - pQueryInfo->nSlidingTime, minSlidingTime); + pQueryInfo->slidingTime, minSlidingTime); - pQueryInfo->nSlidingTime = minSlidingTime; + pQueryInfo->slidingTime = minSlidingTime; } - if (pQueryInfo->nSlidingTime > pQueryInfo->intervalTime) { + if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream, - pQueryInfo->nSlidingTime, pQueryInfo->intervalTime); + pQueryInfo->slidingTime, pQueryInfo->intervalTime); - pQueryInfo->nSlidingTime = pQueryInfo->intervalTime; + pQueryInfo->slidingTime = pQueryInfo->intervalTime; } - pStream->slidingTime = pQueryInfo->nSlidingTime; + pStream->slidingTime = pQueryInfo->slidingTime; pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor - pQueryInfo->nSlidingTime = 0; + pQueryInfo->slidingTime = 0; } static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 72f78ae810eb27c80af5396a442b5a7e052cadb1..65d41fe51621663910d349b0befa2973e582dab6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -27,6 +27,7 @@ #include "tsclient.h" #include "tsqldef.h" #include "ttimer.h" +#include "tast.h" /* * the detailed information regarding metric meta key is: @@ -343,22 +344,22 @@ void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) { int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pRes->tsrow == NULL) { - pRes->numOfnchar = 0; - +// int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols; - for (int32_t i = 0; i < numOfOutputCols; ++i) { - TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i); - if (pField->type == TSDB_DATA_TYPE_NCHAR) { - pRes->numOfnchar++; - } - } + pRes->numOfnchar = numOfOutputCols; +// for (int32_t i = 0; i < numOfOutputCols; ++i) { +// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i); +// if (pField->type == TSDB_DATA_TYPE_NCHAR) { +// pRes->numOfnchar++; +// } +// } pRes->tsrow = calloc(1, (POINTER_BYTES + sizeof(short)) * numOfOutputCols + POINTER_BYTES * pRes->numOfnchar); pRes->bytes = calloc(numOfOutputCols, sizeof(short)); - if (pRes->numOfnchar > 0) { - pRes->buffer = calloc(POINTER_BYTES, pRes->numOfnchar); - } +// if (pRes->numOfnchar > 0) { + pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols); +// } // not enough memory if (pRes->tsrow == NULL || pRes->bytes == NULL || (pRes->buffer == NULL && pRes->numOfnchar > 0)) { @@ -376,7 +377,7 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void tscDestroyResPointerInfo(SSqlRes* pRes) { if (pRes->buffer != NULL) { - assert(pRes->numOfnchar > 0); +// assert(pRes->numOfnchar > 0); // free all buffers containing the multibyte string for (int i = 0; i < pRes->numOfnchar; i++) { tfree(pRes->buffer[i]); @@ -831,11 +832,18 @@ static void ensureSpace(SFieldInfo* pFieldInfo, int32_t size) { pFieldInfo->pFields = realloc(pFieldInfo->pFields, newSize * sizeof(TAOS_FIELD)); memset(&pFieldInfo->pFields[oldSize], 0, inc * sizeof(TAOS_FIELD)); - pFieldInfo->pOffset = realloc(pFieldInfo->pOffset, newSize * sizeof(int16_t)); - memset(&pFieldInfo->pOffset[oldSize], 0, inc * sizeof(int16_t)); +// pFieldInfo->pOffset = realloc(pFieldInfo->pOffset, newSize * sizeof(int16_t)); +// memset(&pFieldInfo->pOffset[oldSize], 0, inc * sizeof(int16_t)); pFieldInfo->pVisibleCols = realloc(pFieldInfo->pVisibleCols, newSize * sizeof(bool)); + memset(&pFieldInfo->pVisibleCols[oldSize], 0, inc * sizeof(bool)); + pFieldInfo->pSqlExpr = realloc(pFieldInfo->pSqlExpr, POINTER_BYTES*newSize); + pFieldInfo->pExpr = realloc(pFieldInfo->pExpr, POINTER_BYTES*newSize); + + memset(&pFieldInfo->pSqlExpr[oldSize], 0, inc * POINTER_BYTES); + memset(&pFieldInfo->pExpr[oldSize], 0, inc * POINTER_BYTES); + pFieldInfo->numOfAlloc = newSize; } } @@ -844,6 +852,15 @@ static void evic(SFieldInfo* pFieldInfo, int32_t index) { if (index < pFieldInfo->numOfOutputCols) { memmove(&pFieldInfo->pFields[index + 1], &pFieldInfo->pFields[index], sizeof(pFieldInfo->pFields[0]) * (pFieldInfo->numOfOutputCols - index)); + + memmove(&pFieldInfo->pVisibleCols[index + 1], &pFieldInfo->pVisibleCols[index], + sizeof(pFieldInfo->pVisibleCols[0]) * (pFieldInfo->numOfOutputCols - index)); + + memmove(&pFieldInfo->pSqlExpr[index + 1], &pFieldInfo->pSqlExpr[index], + sizeof(pFieldInfo->pSqlExpr[0]) * (pFieldInfo->numOfOutputCols - index)); + + memmove(&pFieldInfo->pExpr[index + 1], &pFieldInfo->pExpr[index], + sizeof(pFieldInfo->pExpr[0]) * (pFieldInfo->numOfOutputCols - index)); } } @@ -868,7 +885,6 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE memcpy(&pFieldInfo->pFields[index], pField, sizeof(TAOS_FIELD)); pFieldInfo->pVisibleCols[index] = true; - pFieldInfo->numOfOutputCols++; } @@ -902,29 +918,49 @@ void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, co pFieldInfo->numOfOutputCols++; } -void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) { - SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; - pFieldInfo->pOffset[0] = 0; +void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr) { + assert(index >= 0 && index < pFieldInfo->numOfOutputCols); + pFieldInfo->pSqlExpr[index] = pExpr; +} + +void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionExpr* pExpr) { + assert(index >= 0 && index < pFieldInfo->numOfOutputCols); + pFieldInfo->pExpr[index] = pExpr; +} - for (int32_t i = 1; i < pFieldInfo->numOfOutputCols; ++i) { - pFieldInfo->pOffset[i] = pFieldInfo->pOffset[i - 1] + pFieldInfo->pFields[i - 1].bytes; +void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) { + SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo; + pExprInfo->pExprs[0].offset = 0; + + for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) { + pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes; } } void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { - SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; - if (pFieldInfo->numOfOutputCols == 0) { +// SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; +// if (pFieldInfo->numOfOutputCols == 0) { +// return; +// } +// +// pFieldInfo->pOffset[0] = 0; +// +// /* +// * the retTypeLen is used to store the intermediate result length +// * for potential secondary merge exists +// */ +// for (int32_t i = 1; i < pFieldInfo->numOfOutputCols; ++i) { +// pFieldInfo->pOffset[i] = pFieldInfo->pOffset[i - 1] + tscSqlExprGet(pQueryInfo, i - 1)->resBytes; +// } + SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo; + if (pExprInfo->numOfExprs == 0) { return; } - - pFieldInfo->pOffset[0] = 0; - - /* - * the retTypeLen is used to store the intermediate result length - * for potential secondary merge exists - */ - for (int32_t i = 1; i < pFieldInfo->numOfOutputCols; ++i) { - pFieldInfo->pOffset[i] = pFieldInfo->pOffset[i - 1] + tscSqlExprGet(pQueryInfo, i - 1)->resBytes; + + pExprInfo->pExprs[0].offset = 0; + + for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) { + pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes; } } @@ -948,12 +984,14 @@ void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) { *dst = *src; dst->pFields = malloc(sizeof(TAOS_FIELD) * dst->numOfAlloc); - dst->pOffset = malloc(sizeof(short) * dst->numOfAlloc); +// dst->pOffset = malloc(sizeof(short) * dst->numOfAlloc); dst->pVisibleCols = malloc(sizeof(bool) * dst->numOfAlloc); + dst->pSqlExpr = malloc(POINTER_BYTES * dst->numOfAlloc); memcpy(dst->pFields, src->pFields, sizeof(TAOS_FIELD) * dst->numOfOutputCols); - memcpy(dst->pOffset, src->pOffset, sizeof(short) * dst->numOfOutputCols); +// memcpy(dst->pOffset, src->pOffset, sizeof(short) * dst->numOfOutputCols); memcpy(dst->pVisibleCols, src->pVisibleCols, sizeof(bool) * dst->numOfOutputCols); + memcpy(dst->pSqlExpr, src->pSqlExpr, POINTER_BYTES * dst->numOfOutputCols); } TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index) { @@ -967,11 +1005,11 @@ TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index) { int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutputCols; } int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { - if (index >= pQueryInfo->fieldsInfo.numOfOutputCols) { + if (index >= pQueryInfo->exprsInfo.numOfExprs) { return 0; } - return pQueryInfo->fieldsInfo.pOffset[index]; + return pQueryInfo->exprsInfo.pExprs[index].offset; } int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) { @@ -995,13 +1033,16 @@ int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) { } int32_t tscGetResRowLength(SQueryInfo* pQueryInfo) { - SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; - if (pFieldInfo->numOfOutputCols <= 0) { + if (pQueryInfo->exprsInfo.numOfExprs <= 0) { return 0; } - - return pFieldInfo->pOffset[pFieldInfo->numOfOutputCols - 1] + - pFieldInfo->pFields[pFieldInfo->numOfOutputCols - 1].bytes; + + int32_t size = 0; + for(int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { + size += pQueryInfo->exprsInfo.pExprs[i].resBytes; + } + + return size; } void tscClearFieldInfo(SFieldInfo* pFieldInfo) { @@ -1009,10 +1050,20 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) { return; } - tfree(pFieldInfo->pOffset); +// tfree(pFieldInfo->pOffset); tfree(pFieldInfo->pFields); tfree(pFieldInfo->pVisibleCols); - + tfree(pFieldInfo->pSqlExpr); + + for(int32_t i = 0; i < pFieldInfo->numOfOutputCols; ++i) { + if (pFieldInfo->pExpr[i] != NULL) { + tSQLBinaryExprDestroy(&pFieldInfo->pExpr[i]->pBinExprInfo.pBinExpr, NULL); + tfree(pFieldInfo->pExpr[i]->pBinExprInfo.pReqColumns); + tfree(pFieldInfo->pExpr[i]); + } + } + + tfree(pFieldInfo->pExpr); memset(pFieldInfo, 0, sizeof(SFieldInfo)); } @@ -1123,6 +1174,10 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi return pExpr; } +int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { + return pQueryInfo->exprsInfo.numOfExprs; +} + void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) { if (pExpr == NULL || argument == NULL || bytes == 0) { return; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0398a82fc36b5081729bba14e6e56a4826243351..ab5b2911b629f0279a6a9691b8173d98ec2081b7 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -417,6 +417,7 @@ typedef struct SColIndexEx { int16_t colIdx; int16_t colIdxInBuf; uint16_t flag; // denote if it is a tag or not + char name[TSDB_COL_NAME_LEN]; } SColIndexEx; /* sql function msg, to describe the message to vnode about sql function diff --git a/src/inc/tast.h b/src/inc/tast.h index d7950b54f6536258d9c5c64ed54a209e04305be7..d0fbcba071a2affc0388b6402266f8e59cd23f58 100644 --- a/src/inc/tast.h +++ b/src/inc/tast.h @@ -103,6 +103,8 @@ void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char void tSQLBinaryExprTrv(tSQLBinaryExpr *pExprs, int32_t *val, int16_t *ids); void tQueryResultClean(tQueryResultset *pRes); +uint8_t getBinaryExprOptr(SSQLToken *pToken); + #ifdef __cplusplus } #endif diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index e82078685a3781fd40e8a476a4eb4dd2bdeb7aee..e07ee6d13ad733ee439b8b90ccb84974768ce80b 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -1056,7 +1056,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); - pMsg += sizeof(SSqlFuncExprMsg); + pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index 6f25d3a8b12c5cf730b8cbfadfaaa9e2837c5a63..e8f30f5a1c63608563f11785e3512b5c92ab1041 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -372,18 +372,22 @@ void vnodeUpdateFilterColumnIndex(SQuery* pQuery) { } // set the column index in buffer for arithmetic operation - if (pQuery->pSelectExpr != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SSqlBinaryExprInfo* pBinExprInfo = &pQuery->pSelectExpr[i].pBinExprInfo; - if (pBinExprInfo->pBinExpr != NULL) { - for (int16_t j = 0; j < pBinExprInfo->numOfCols; ++j) { - for (int32_t k = 0; k < pQuery->numOfCols; ++k) { - if (pBinExprInfo->pReqColumns[j].colId == pQuery->colList[k].data.colId) { - pBinExprInfo->pReqColumns[j].colIdxInBuf = pQuery->colList[k].colIdxInBuf; - assert(pQuery->colList[k].colIdxInBuf == k); - break; - } - } + if (pQuery->pSelectExpr == NULL) { + return; + } + + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SSqlBinaryExprInfo* pBinExprInfo = &pQuery->pSelectExpr[i].pBinExprInfo; + if (pBinExprInfo->pBinExpr == NULL) { + continue; + } + + for (int16_t j = 0; j < pBinExprInfo->numOfCols; ++j) { + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + if (pBinExprInfo->pReqColumns[j].colId == pQuery->colList[k].data.colId) { + pBinExprInfo->pReqColumns[j].colIdxInBuf = pQuery->colList[k].colIdxInBuf; + assert(pQuery->colList[k].colIdxInBuf == k); + break; } } }