diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 0af8c8b5768c844954a072521e67a3cb6e914ac3..ce67344b03a015419e485aa28e54575c2cf60045 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -62,6 +62,7 @@ typedef struct SLocalReducer { bool hasUnprocessedRow; tOrderDescriptor * pDesc; SColumnModel * resColModel; + SColumnModel* finalModel; tExtMemBuffer ** pExtMemBuffer; // disk-based buffer SFillInfo* pFillInfo; // interpolation support structure char* pFinalRes; // result data after interpo @@ -74,7 +75,8 @@ typedef struct SLocalReducer { typedef struct SRetrieveSupport { tExtMemBuffer ** pExtMemBuffer; // for build loser tree tOrderDescriptor *pOrderDescriptor; - SColumnModel * pFinalColModel; // colModel for final result + SColumnModel* pFinalColModel; // colModel for final result + SColumnModel* pFFColModel; int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to @@ -96,7 +98,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF * create local reducer to launch the second-stage reduce process at client site */ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, - SColumnModel *finalModel, SSqlObj* pSql); + SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql); void tscDestroyLocalReducer(SSqlObj *pSql); diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 3406dcd8589e149f2a62040d1271fbfd40caaac7..3226f705283fc4b5af5f1d8c6e3a736300bd2fa6 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -41,6 +41,8 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql); void tscBuildResFromSubqueries(SSqlObj *pSql); TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult); +char *getArithemicInputSrc(void *param, const char *name, int32_t colId); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6628e3087456c52b2daf1b9ef84efa9577ad4f36..223fb5d226690f480f6d8196180ec61b3f2fbd92 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -125,6 +125,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); +bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); @@ -158,7 +159,7 @@ SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t ind TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); -void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); +void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); void tscFieldInfoClear(SFieldInfo* pFieldInfo); @@ -167,15 +168,15 @@ static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQue int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2); -void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); +void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes); int32_t tscGetResRowLength(SArray* pExprList); SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, - int16_t size, int16_t interSize, bool isTagCol); + int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, - int16_t size, int16_t interSize, bool isTagCol); + int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, int16_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4c85af4919c74de5f41272272bd1bdbf4f6c6da3..6b3d97d6f9e5ffb6168938d3bc3e0a7a26dcbf33 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -136,6 +136,7 @@ typedef struct SSqlExpr { int16_t numOfParams; // argument value of each function tVariant param[3]; // parameters are not more than 3 int32_t offset; // sub result column value of arithmetic expression. + int16_t resColId; // result column id } SSqlExpr; typedef struct SColumnIndex { @@ -251,6 +252,7 @@ typedef struct SQueryInfo { int64_t clauseLimit; // limit for current sub clause int64_t prjOffset; // offset value in the original sql expression, only applied at client side int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX + int16_t resColumnId; // result column id } SQueryInfo; typedef struct { @@ -462,17 +464,16 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); -static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { +static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex, int32_t offset) { SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex); - assert(pInfo->pSqlExpr != NULL); - int32_t type = pInfo->pSqlExpr->resType; - int32_t bytes = pInfo->pSqlExpr->resBytes; + int32_t type = pInfo->field.type; + int32_t bytes = pInfo->field.bytes; - char* pData = pRes->data + (int32_t)(pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row); + char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row); // user defined constant value output columns - if (TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) { + if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) { if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { pData = pInfo->pSqlExpr->param[1].pz; pRes->length[columnIndex] = pInfo->pSqlExpr->param[1].nLen; @@ -517,6 +518,7 @@ extern SRpcCorEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); +int16_t getNewResColId(SQueryInfo* pQueryInfo); #ifdef __cplusplus } diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 99c03c65801a64eb6b913373603cfa9c48d36911..e9e8214c4c15588b126b0768745db084e6e7dc3b 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -351,7 +351,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); if (pSup->pSqlExpr != NULL) { - tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); + tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0); } else { // todo add } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index fa03440126fd9d1ae692028cafbd505ae09eba90..310c4c665761212f6fc46b889a8a2c887b7213eb 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -162,7 +162,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, - (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false); + (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false); rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE); @@ -172,7 +172,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE), - typeColLength, false); + -1000, typeColLength, false); rowLen += typeColLength + VARSTR_HEADER_SIZE; @@ -182,7 +182,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), - sizeof(int32_t), false); + -1000, sizeof(int32_t), false); rowLen += sizeof(int32_t); @@ -192,7 +192,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE), - noteColLength, false); + -1000, noteColLength, false); rowLen += noteColLength + VARSTR_HEADER_SIZE; return rowLen; @@ -407,8 +407,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const } SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); - pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, - f.bytes, f.bytes - VARSTR_HEADER_SIZE, false); + pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false); rowLen += f.bytes; @@ -422,7 +421,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, - (int16_t)(ddlLen + VARSTR_HEADER_SIZE), ddlLen, false); + (int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false); rowLen += ddlLen + VARSTR_HEADER_SIZE; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 84a4ef9a169775db29b573f4b9ed2daefb4f63dd..b07c7ca66dde0a9b568436f997572829b81cb9fa 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -13,14 +13,15 @@ * along with this program. If not, see . */ +#include "tscLocalMerge.h" +#include "tscSubquery.h" #include "os.h" +#include "qAst.h" #include "tlosertree.h" +#include "tscLog.h" #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" -#include "tutil.h" -#include "tscLog.h" -#include "tscLocalMerge.h" typedef struct SCompareParam { SLocalDataSource **pLocalData; @@ -29,6 +30,8 @@ typedef struct SCompareParam { int32_t groupOrderType; } SCompareParam; +static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize); + int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pRightIdx = *(int32_t *)pRight; @@ -132,28 +135,41 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc } static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { - int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); + int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo); int32_t offset = 0; SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); for(int32_t i = 0; i < numOfCols; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - - pFillCol[i].col.bytes = pExpr->resBytes; - pFillCol[i].col.type = (int8_t)pExpr->resType; - pFillCol[i].col.colId = pExpr->colInfo.colId; - pFillCol[i].flag = pExpr->colInfo.flag; - pFillCol[i].col.offset = offset; - pFillCol[i].functionId = pExpr->functionId; - pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; - offset += pExpr->resBytes; + SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); + + if (pIField->pArithExprInfo == NULL) { + SSqlExpr* pExpr = pIField->pSqlExpr; + + pFillCol[i].col.bytes = pExpr->resBytes; + pFillCol[i].col.type = (int8_t)pExpr->resType; + pFillCol[i].col.colId = pExpr->colInfo.colId; + pFillCol[i].flag = pExpr->colInfo.flag; + pFillCol[i].col.offset = offset; + pFillCol[i].functionId = pExpr->functionId; + pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; + } else { + pFillCol[i].col.bytes = pIField->field.bytes; + pFillCol[i].col.type = (int8_t)pIField->field.type; + pFillCol[i].col.colId = -100; + pFillCol[i].flag = TSDB_COL_NORMAL; + pFillCol[i].col.offset = offset; + pFillCol[i].functionId = -1; + pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; + } + + offset += pFillCol[i].col.bytes; } return pFillCol; } void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, - SColumnModel *finalmodel, SSqlObj* pSql) { + SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -342,8 +358,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd return; } - size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); - pReducer->pTempBuffer->num = 0; tscCreateResPointerInfo(pRes, pQueryInfo); @@ -372,7 +386,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pQueryInfo->fillType != TSDB_FILL_NONE) { SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - 4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, + 4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); } } @@ -491,7 +505,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { + int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < numOfExprs; ++i) { SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; tVariantDestroy(&pCtx->tag); @@ -555,7 +570,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm if (numOfGroupByCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { - int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; + int32_t numOfInternalOutput = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); + int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols; // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { @@ -674,6 +690,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pSchema[i].bytes = pExpr->resBytes; pSchema[i].type = (int8_t)pExpr->resType; + tstrncpy(pSchema[i].name, pExpr->aliasName, tListLen(pSchema[i].name)); + rlen += pExpr->resBytes; } @@ -736,8 +754,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr } *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity); - tfree(pSchema); + tfree(pSchema); return TSDB_CODE_SUCCESS; } @@ -966,10 +984,11 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo); } + int32_t offset = 0; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i); memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows)); + offset += pField->bytes; } pRes->numOfRowsGroup += pRes->numOfRows; @@ -1222,6 +1241,10 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur tColModelCompact(pModel, pResBuf, pModel->capacity); + if (tscIsSecondStageQuery(pQueryInfo)) { + doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); + } + #ifdef _DEBUG_VIEW printf("final result before interpo:\n"); // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); @@ -1588,3 +1611,44 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) pRes->pLocalReducer->pResultBuf->num = numOfRes; pRes->data = pRes->pLocalReducer->pResultBuf->data; } + +void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { + char* pbuf = calloc(1, pOutput->num * rowSize); + + size_t size = tscNumOfFields(pQueryInfo); + SArithmeticSupport arithSup = {0}; + + // todo refactor + arithSup.offset = 0; + arithSup.numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); + arithSup.exprList = pQueryInfo->exprList; + arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); + + for(int32_t k = 0; k < arithSup.numOfCols; ++k) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); + arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->offset); + } + + int32_t offset = 0; + + for (int i = 0; i < size; ++i) { + SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + + // calculate the result from several other columns + if (pSup->pArithExprInfo != NULL) { + arithSup.pArithExpr = pSup->pArithExprInfo; + tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc); + } else { + SSqlExpr* pExpr = pSup->pSqlExpr; + memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, pExpr->resBytes * pOutput->num); + } + + offset += pSup->field.bytes; + } + + assert(finalRowSize <= rowSize); + memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize); + + tfree(pbuf); + tfree(arithSup.data); +} \ No newline at end of file diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c13de00136414a4e4fe3d3a7cf1e93ea7e53b48a..b55326bbd37afb1255dfe71d253b084826983f72 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -52,7 +52,8 @@ typedef struct SConvertFunc { int32_t originFuncId; int32_t execFuncId; } SConvertFunc; -static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIndex, int32_t tableIndex); + +static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex); static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); static char* getAccountId(SSqlObj* pSql); @@ -127,6 +128,10 @@ static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid); +int16_t getNewResColId(SQueryInfo* pQueryInfo) { + return pQueryInfo->resColumnId--; +} + static uint8_t convertOptr(SStrToken *pToken) { switch (pToken->type) { case TK_LT: @@ -1274,6 +1279,7 @@ static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; tscColumnListInsert(pQueryInfo->colList, &tsCol); } + static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSQLExprItem* pItem) { const char* msg1 = "invalid column name, illegal column type, or columns in arithmetic expression from two tables"; const char* msg2 = "invalid arithmetic expression in select clause"; @@ -1305,7 +1311,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t SColumnIndex index = {.tableIndex = tableIndex}; SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double), - sizeof(double), false); + -1000, sizeof(double), false); char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z; size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1); @@ -1321,6 +1327,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } + // check for if there is a tag in the arithmetic express size_t numOfNode = taosArrayGetSize(colList); for(int32_t k = 0; k < numOfNode; ++k) { SColIndex* pIndex = taosArrayGet(colList, k); @@ -1346,9 +1353,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t char* c = tbufGetData(&bw, false); // set the serialized binary string as the parameter of arithmetic expression - addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len, index.tableIndex); - + addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len); insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); + // add ts column tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); @@ -1380,6 +1387,10 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t pArithExprInfo->interBytes = sizeof(double); pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE; + pArithExprInfo->base.functionId = TSDB_FUNC_ARITHM; + pArithExprInfo->base.numOfParams = 1; + pArithExprInfo->base.resColId = getNewResColId(pQueryInfo); + int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid); if (ret != TSDB_CODE_SUCCESS) { tExprTreeDestroy(&pArithExprInfo->pExpr, NULL); @@ -1388,14 +1399,30 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t pInfo->pArithExprInfo = pArithExprInfo; } + + SBufferWriter bw = tbufInitWriter(NULL, false); + + TRY(0) { + exprTreeToBinary(&bw, pInfo->pArithExprInfo->pExpr); + } CATCH(code) { + tbufCloseWriter(&bw); + UNUSED(code); + // TODO: other error handling + } END_TRY + + SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base; + pFuncMsg->arg[0].argBytes = (int16_t) tbufTell(&bw); + pFuncMsg->arg[0].argValue.pz = tbufGetData(&bw, true); + pFuncMsg->arg[0].argType = TSDB_DATA_TYPE_BINARY; + +// tbufCloseWriter(&bw); // TODO there is a memory leak } return TSDB_CODE_SUCCESS; } - static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) { - SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos, pIndex->columnIndex, pIndex->tableIndex); + SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, pIndex->columnIndex, pIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -1540,7 +1567,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi return TSDB_CODE_SUCCESS; } -SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIndex, int32_t tableIndex) { +SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; int32_t numOfCols = tscGetNumOfColumns(pTableMeta); @@ -1552,20 +1579,22 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c if (functionId == TSDB_FUNC_TAGPRJ) { index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta); - tscColumnListInsert(pTableMetaInfo->tagColList, &index); } else { index.columnIndex = colIndex; } - - return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, - pSchema->bytes, functionId == TSDB_FUNC_TAGPRJ); + + int16_t colId = getNewResColId(pQueryInfo); + return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, pSchema->bytes, + (functionId == TSDB_FUNC_TAGPRJ)); } SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) { + int16_t colId = getNewResColId(pQueryInfo); + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, - pColSchema->bytes, pColSchema->bytes, TSDB_COL_IS_TAG(flag)); + pColSchema->bytes, colId, pColSchema->bytes, TSDB_COL_IS_TAG(flag)); tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName)); SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex); @@ -1601,7 +1630,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum } for (int32_t j = 0; j < numOfTotalColumns; ++j) { - SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex); + SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, j, pIndex->tableIndex); tstrncpy(pExpr->aliasName, pSchema[j].name, sizeof(pExpr->aliasName)); pIndex->columnIndex = j; @@ -1710,7 +1739,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS bytes = pSchema->bytes; } - SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); + SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pQueryInfo), bytes, false); tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName)); if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) { @@ -1804,7 +1833,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); + pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false); } else if (sqlOptr == TK_INTEGER) { // select count(1) from table1 char buf[8] = {0}; int64_t val = -1; @@ -1816,7 +1845,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (val == 1) { index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); + pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false); } else { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -1836,12 +1865,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, isTag); + pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, isTag); } } else { // count(*) is equalled to count(primary_timestamp_key) index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); + pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false); } pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); @@ -1928,7 +1957,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col colIndex += 1; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, - TSDB_KEYSIZE, false); + getNewResColId(pQueryInfo), TSDB_KEYSIZE, false); SColumnList ids = getColumnList(1, 0, 0); insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr); @@ -1939,7 +1968,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, resultSize, false); + SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false); if (optr == TK_LEASTSQUARES) { /* set the leastsquares parameters */ @@ -1948,14 +1977,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_TSC_INVALID_SQL; } - addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES, 0); + addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES); memset(val, 0, tListLen(val)); if (tVariantDump(&pParamElem[2].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) { return TSDB_CODE_TSC_INVALID_SQL; } - addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0); + addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); } SColumnList ids = {0}; @@ -2180,8 +2209,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); colIndex += 1; // the first column is ts - pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false); - addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0); + pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false); + addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); } else { tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); @@ -2198,8 +2227,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // todo REFACTOR // set the first column ts for top/bottom query SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, - TSDB_KEYSIZE, false); + pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pQueryInfo), + TSDB_KEYSIZE, false); tstrncpy(pExpr->aliasName, aAggs[TSDB_FUNC_TS].aName, sizeof(pExpr->aliasName)); const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; @@ -2209,8 +2238,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col colIndex += 1; // the first column is ts - pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false); - addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0); + pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false); + addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); } memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); @@ -2694,7 +2723,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { } } - tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); return TSDB_CODE_SUCCESS; } @@ -2922,7 +2951,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) { if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); } else { tscFieldInfoUpdateOffset(pQueryInfo); } @@ -4437,7 +4466,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - size_t size = tscSqlExprNumOfExprs(pQueryInfo); + size_t size = tscNumOfFields(pQueryInfo); if (pQueryInfo->fillVal == NULL) { pQueryInfo->fillVal = calloc(size, sizeof(int64_t)); @@ -4451,12 +4480,8 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery } else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->fillType = TSDB_FILL_NULL; for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) { - TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); - } else { - setNull((char*)&pQueryInfo->fillVal[i], pFields->type, pFields->bytes); - }; + TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); + setNull((char*)&pQueryInfo->fillVal[i], pField->type, pField->bytes); } } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->fillType = TSDB_FILL_PREV; @@ -4487,15 +4512,15 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery int32_t j = 1; for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) { - TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); + TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); + if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull((char*) &pQueryInfo->fillVal[i], pField->type); continue; } tVariant* p = taosArrayGet(pFillToken, j); - int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pFields->type, true); + int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pField->type, true); if (ret != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } @@ -4505,12 +4530,12 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery tVariantListItem* lastItem = taosArrayGetLast(pFillToken); for (int32_t i = numOfFillVal; i < size; ++i) { - TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); + TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); + if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull((char*) &pQueryInfo->fillVal[i], pField->type); } else { - tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type, true); + tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pField->type, true); } } } @@ -5447,7 +5472,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau int16_t type = pTagSchema->type; int16_t bytes = pTagSchema->bytes; - pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true); + pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true); pExpr->colInfo.flag = TSDB_COL_TAG; // NOTE: tag column does not add to source column list @@ -5750,7 +5775,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo if (TSDB_COL_IS_TAG(pColIndex->flag)) { SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex}; - SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true); + SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true); memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName)); @@ -5913,7 +5938,7 @@ int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ SColumnIndex ind = {0}; SSqlExpr* pExpr1 = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG_DUMMY, &ind, TSDB_DATA_TYPE_INT, - tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false); + tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, getNewResColId(pQueryInfo), tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false); const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name; tstrncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName)); @@ -6585,6 +6610,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) { (*pExpr)->pSchema->type = (uint8_t)p1->resType; (*pExpr)->pSchema->bytes = p1->resBytes; + (*pExpr)->pSchema->colId = p1->resColId; if (uid != NULL) { *uid = p1->uid; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index dc55e3f15995ec6c432a07b41056df91629caa2c..e8b6cb284ee40a3b2200e8d54913dca4ca158309 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -698,7 +698,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->queryType = htonl(pQueryInfo->type); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); - pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); + pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number // set column list ids size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); @@ -760,12 +760,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_SQL; } + assert(pExpr->resColId < 0); + pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId); pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); + pSqlFuncExpr->resColId = htons(pExpr->resColId); pMsg += sizeof(SSqlFuncMsg); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { @@ -783,7 +786,73 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr = (SSqlFuncMsg *)pMsg; } - + + if(tscIsSecondStageQuery(pQueryInfo)) { + size_t output = tscNumOfFields(pQueryInfo); + pQueryMsg->secondStageOutput = htonl((int32_t) output); + + SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; + + for (int32_t i = 0; i < output; ++i) { + SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); + SSqlExpr *pExpr = pField->pSqlExpr; + if (pExpr != NULL) { + if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { + tscError("%p table schema is not matched with parsed sql", pSql); + return TSDB_CODE_TSC_INVALID_SQL; + } + + pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId); + pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex); + pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag); + + pSqlFuncExpr1->functionId = htons(pExpr->functionId); + pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams); + pMsg += sizeof(SSqlFuncMsg); + + for (int32_t j = 0; j < pExpr->numOfParams; ++j) { + // todo add log + pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); + pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen); + + if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { + memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); + pMsg += pExpr->param[j].nLen; + } else { + pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key); + } + } + + pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; + } else { + assert(pField->pArithExprInfo != NULL); + SExprInfo* pExprInfo = pField->pArithExprInfo; + + pSqlFuncExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId); + pSqlFuncExpr1->functionId = htons(pExprInfo->base.functionId); + pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams); + pMsg += sizeof(SSqlFuncMsg); + + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { + // todo add log + pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType); + pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes); + + if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) { + memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes); + pMsg += pExprInfo->base.arg[j].argBytes; + } else { + pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64); + } + } + + pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; + } + } + } else { + pQueryMsg->secondStageOutput = 0; + } + // serialize the table info (sid, uid, tags) pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); @@ -810,7 +879,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } if (pQueryInfo->fillType != TSDB_FILL_NONE) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { + for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]); pMsg += sizeof(pQueryInfo->fillVal[0]); } @@ -1946,7 +2015,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, - pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); + pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false); } pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5f8a2eb6b764983a529ff58ffb84dc47447c281b..a7b859b294da689406911cb027d75b3c91149ccc 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -28,7 +28,6 @@ #include "tutil.h" #include "ttimer.h" #include "tscProfile.h" -#include "ttimer.h" static bool validImpl(const char* str, size_t maxsize) { if (str == NULL) { @@ -482,7 +481,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { assert(0); for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); + tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0); } *rows = pRes->tsrow; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index eb32e2490a3aa1a363be632a31e051ad2ca7a6a8..bc522d4007e24a01ed1bf61846f924c912cf3387 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1642,9 +1642,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } tExtMemBuffer ** pMemoryBuf = NULL; - tOrderDescriptor *pDesc = NULL; - SColumnModel * pModel = NULL; - + tOrderDescriptor *pDesc = NULL; + SColumnModel *pModel = NULL; + pRes->qhandle = 0x1; // hack the qhandle check const uint32_t nBufferSize = (1u << 16); // 64KB @@ -1707,7 +1707,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->subqueryIndex = i; trs->pParentSql = pSql; trs->pFinalColModel = pModel; - + SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1762,10 +1762,6 @@ static void tscFreeRetrieveSup(SSqlObj *pSql) { } tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport); -// int32_t index = trsupport->subqueryIndex; -// SSqlObj *pParentSql = trsupport->pParentSql; - -// assert(pSql == pParentSql->pSubs[index]); tfree(trsupport->localBuffer); tfree(trsupport); } @@ -1956,7 +1952,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql); + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql); tscDebug("%p build loser tree completed", pParentSql); pParentSql->res.precision = pSql->res.precision; @@ -2418,7 +2414,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF } } -static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { +char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { SArithmeticSupport *pSupport = (SArithmeticSupport *) param; int32_t index = -1; @@ -2449,48 +2445,22 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); size_t size = tscNumOfFields(pQueryInfo); + int32_t offset = 0; + for (int i = 0; i < size; ++i) { - SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); - if (pSup->pSqlExpr != NULL) { - tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); - } + tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, offset); + TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + + offset += pField->bytes; // primary key column cannot be null in interval query, no need to check if (i == 0 && pQueryInfo->interval.interval > 0) { continue; } - TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) { transferNcharData(pSql, i, pField); } - - // calculate the result from several other columns - if (pSup->pArithExprInfo != NULL) { - if (pRes->pArithSup == NULL) { - pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport)); - } - - pRes->pArithSup->offset = 0; - pRes->pArithSup->pArithExpr = pSup->pArithExprInfo; - pRes->pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); - pRes->pArithSup->exprList = pQueryInfo->exprList; - pRes->pArithSup->data = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES); - - if (pRes->buffer[i] == NULL) { - TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - pRes->buffer[i] = malloc(field->bytes); - } - - for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); - pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; - } - - tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup, - TSDB_ORDER_ASC, getArithemicInputSrc); - pRes->tsrow[i] = (unsigned char*)pRes->buffer[i]; - } } pRes->row++; // index increase one-step diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 080ef9f2d20ac8c82df6d4e93c3b2091ec2c1f12..27824fc1ffe01615a55b78056817733ac4c400c8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -219,6 +219,24 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { return true; } +bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) { + size_t numOfOutput = tscNumOfFields(pQueryInfo); + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + + if (numOfOutput == numOfExprs) { + return false; + } + + for(int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo; + if (pExprInfo != NULL) { + return true; + } + } + + return false; +} + bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < numOfExprs; ++i) { @@ -855,28 +873,11 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->offset = 0; - - for (int32_t i = 1; i < numOfExprs; ++i) { - SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1); - SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i); - - p->offset = prev->offset + prev->resBytes; - } -} -void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { - if (tscSqlExprNumOfExprs(pQueryInfo) == 0) { - return; - } - - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); - pExpr->offset = 0; - - size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 1; i < numOfExprs; ++i) { SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1); SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i); - + p->offset = prev->offset + prev->resBytes; } } @@ -944,6 +945,14 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { if (pInfo->pArithExprInfo != NULL) { tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL); + + SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base; + for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) { + if (pFuncMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { + tfree(pFuncMsg->arg[j].argValue.pz); + } + } + tfree(pInfo->pArithExprInfo); } } @@ -955,7 +964,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { } static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, - int16_t size, int16_t interSize, int32_t colType) { + int16_t size, int16_t resColId, int16_t interSize, int32_t colType) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); @@ -988,8 +997,9 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol pExpr->resType = type; pExpr->resBytes = size; + pExpr->resColId = resColId; pExpr->interBytes = interSize; - + if (pTableMetaInfo->pTableMeta) { pExpr->uid = pTableMetaInfo->pTableMeta->id.uid; } @@ -998,20 +1008,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol } SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, - int16_t size, int16_t interSize, bool isTagCol) { + int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList); if (index == num) { - return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); + return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); } - SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); + SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); taosArrayInsert(pQueryInfo->exprList, index, &pExpr); return pExpr; } SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, - int16_t size, int16_t interSize, bool isTagCol) { - SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); + int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { + SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); taosArrayPush(pQueryInfo->exprList, &pExpr); return pExpr; } @@ -1039,16 +1049,14 @@ size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { return taosArrayGetSize(pQueryInfo->exprList); } -void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) { - if (pExpr == NULL || argument == NULL || bytes == 0) { - return; - } +void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) { + assert (pExpr != NULL || argument != NULL || bytes != 0); // set parameter value // transfer to tVariant from byte data/no ascii data tVariantCreateFromBinary(&pExpr->param[pExpr->numOfParams], argument, bytes, type); - pExpr->numOfParams += 1; + assert(pExpr->numOfParams <= 3); } @@ -1601,6 +1609,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; + pQueryInfo->resColumnId= -1000; } int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 25b77d68456d8962f61afcd17f7f5dc25c695cf5..681fa4492959347d5af5591615c0b39dbacbef7e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -392,6 +392,7 @@ typedef struct SColIndex { typedef struct SSqlFuncMsg { int16_t functionId; int16_t numOfParams; + int16_t resColId; // result column id, id of the current output column SColIndex colInfo; struct ArgElem { @@ -461,11 +462,6 @@ typedef struct STimeWindow { TSKEY ekey; } STimeWindow; -/* - * the outputCols is equalled to or larger than numOfCols - * e.g., select min(colName), max(colName), avg(colName) from table - * the outputCols will be 3 while the numOfCols is 1. - */ typedef struct { SMsgHead head; STimeWindow window; @@ -483,13 +479,14 @@ typedef struct { uint32_t queryType; // denote another query process int16_t numOfOutput; // final output columns numbers int16_t tagNameRelType; // relation of tag criteria and tbname criteria - int16_t fillType; // interpolate type - uint64_t fillVal; // default value array list - int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed - int32_t tsLen; // total length of ts comp block - int32_t tsNumOfBlocks; // ts comp block numbers - int32_t tsOrder; // ts comp block order - int32_t numOfTags; // number of tags columns involved + int16_t fillType; // interpolate type + uint64_t fillVal; // default value array list + int32_t secondStageOutput; + int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed + int32_t tsLen; // total length of ts comp block + int32_t tsNumOfBlocks; // ts comp block numbers + int32_t tsOrder; // ts comp block order + int32_t numOfTags; // number of tags columns involved SColumnInfo colList[]; } SQueryTableMsg; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 22397d0314c3081e8ed23d9671f4ccfcb7abc244..6b4b188c5ede2fc10baa894a55ad5d2225283d0c 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -152,7 +152,10 @@ typedef struct SQuery { SLimitVal limit; int32_t rowSize; SSqlGroupbyExpr* pGroupbyExpr; - SExprInfo* pSelectExpr; + SExprInfo* pExpr1; + SExprInfo* pExpr2; + int32_t numOfExpr2; + SColumnInfo* colList; SColumnInfo* tagColList; int32_t numOfFilterCols; diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 5d649261a6e9fbf8b9eab44aab699ea792f86138..93dca42fdcfd85b8e11fde205392a15caaa59416 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -48,7 +48,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int } #define curTimeWindowIndex(_winres) ((_winres)->curIndex) -#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) +#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); @@ -62,7 +62,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + - pQuery->pSelectExpr[columnIndex].bytes * realRowId; + pQuery->pExpr1[columnIndex].bytes * realRowId; } bool isNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 84ca78d82249b88d68176b0a350a83fb56b8fda9..32cbb56c62514001ea5314e7e5dce8549daddb97 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -128,7 +128,7 @@ typedef struct SArithmeticSupport { SExprInfo *pArithExpr; int32_t numOfCols; SColumnInfo *colList; - SArray* exprList; // client side used + void *exprList; // client side used int32_t offset; char** data; } SArithmeticSupport; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 59622d9213caf0de14c7d1f097124495fa729bad..869f57f117db9b349b6d28cd819c84aa36f82baf 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -242,7 +242,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { int64_t maxOutput = 0; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pQuery->pSelectExpr[j].base.functionId; + int32_t functionId = pQuery->pExpr1[j].base.functionId; /* * ts, tag, tagprj function can not decide the output number of current query @@ -337,7 +337,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { int32_t numOfSelectivity = 0; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functId = pQuery->pSelectExpr[i].base.functionId; + int32_t functId = pQuery->pExpr1[i].base.functionId; if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) { hasTags = true; continue; @@ -357,7 +357,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool isProjQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functId = pQuery->pSelectExpr[i].base.functionId; + int32_t functId = pQuery->pExpr1[i].base.functionId; if (functId != TSDB_FUNC_PRJ && functId != TSDB_FUNC_TAGPRJ) { return false; } @@ -366,7 +366,7 @@ bool isProjQuery(SQuery *pQuery) { return true; } -bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; } +bool isTSCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; } static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); @@ -387,7 +387,7 @@ static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) { static bool isTopBottomQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TS) { continue; } @@ -401,12 +401,12 @@ static bool isTopBottomQuery(SQuery *pQuery) { } static bool hasTagValOutput(SQuery* pQuery) { - SExprInfo *pExprInfo = &pQuery->pSelectExpr[0]; + SExprInfo *pExprInfo = &pQuery->pExpr1[0]; if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { return true; } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { - SExprInfo *pLocalExprInfo = &pQuery->pSelectExpr[idx]; + SExprInfo *pLocalExprInfo = &pQuery->pExpr1[idx]; // ts_comp column required the tag value for join filter if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) { @@ -784,7 +784,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed pCtx[k].size = forwardStep; pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; + int32_t functionId = pQuery->pExpr1[k].base.functionId; if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { pCtx[k].ptsList = &tsCol[pCtx[k].startOffset]; } @@ -813,7 +813,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { pCtx[k].nStartQueryTimestamp = pWin->skey; - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; + int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pCtx[k], offset); } @@ -922,9 +922,9 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas char *dataBlock = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t functionId = pQuery->pSelectExpr[col].base.functionId; + int32_t functionId = pQuery->pExpr1[col].base.functionId; if (functionId == TSDB_FUNC_ARITHM) { - sas->pArithExpr = &pQuery->pSelectExpr[col]; + sas->pArithExpr = &pQuery->pExpr1[col]; sas->offset = 0; sas->colList = pQuery->colList; @@ -954,9 +954,9 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas } } else { // other type of query function - SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo; + SColIndex *pCol = &pQuery->pExpr1[col].base.colInfo; if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { - SColIndex* pColIndex = &pQuery->pSelectExpr[col].base.colInfo; + SColIndex* pColIndex = &pQuery->pExpr1[col].base.colInfo; SColumnInfoData *p = taosArrayGet(pDataBlock, pColIndex->colIndex); assert(p->info.colId == pColIndex->colId); @@ -1067,7 +1067,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * * tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY */ for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; + int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); } @@ -1075,7 +1075,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { + if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } @@ -1375,7 +1375,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; + int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pCtx[k], offset); } @@ -1404,7 +1404,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { + if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } @@ -1464,11 +1464,11 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId) { - int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId; - int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId; + int32_t functionId = pQuery->pExpr1[colIndex].base.functionId; + int32_t colId = pQuery->pExpr1[colIndex].base.colInfo.colId; SDataStatis *tpField = NULL; - pCtx->hasNull = hasNullValue(&pQuery->pSelectExpr[colIndex].base.colInfo, pStatis, &tpField); + pCtx->hasNull = hasNullValue(&pQuery->pExpr1[colIndex].base.colInfo, pStatis, &tpField); pCtx->aInputElemBuf = inputData; if (tpField != NULL) { @@ -1501,7 +1501,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* * least squares function needs two columns of input, currently, the x value of linear equation is set to - * timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer + * timestamp column, and the y-value is the column specified in pQuery->pExpr1[i].colIdxInBuffer * * top/bottom function needs timestamp to indicate when the * top/bottom values emerge, so does diff function @@ -1574,7 +1574,7 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; + SSqlFuncMsg *pSqlFuncMsg = &pQuery->pExpr1[i].base; if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pCtx[i].outputBytes; @@ -1615,7 +1615,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pRuntimeEnv->offset[0] = 0; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; + SSqlFuncMsg *pSqlFuncMsg = &pQuery->pExpr1[i].base; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SColIndex* pIndex = &pSqlFuncMsg->colInfo; @@ -1649,13 +1649,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order assert(isValidDataType(pCtx->inputType)); pCtx->ptsOutputBuf = NULL; - pCtx->outputBytes = pQuery->pSelectExpr[i].bytes; - pCtx->outputType = pQuery->pSelectExpr[i].type; + pCtx->outputBytes = pQuery->pExpr1[i].bytes; + pCtx->outputType = pQuery->pExpr1[i].type; pCtx->order = pQuery->order.order; pCtx->functionId = pSqlFuncMsg->functionId; pCtx->stableQuery = pRuntimeEnv->stableQuery; - pCtx->interBufBytes = pQuery->pSelectExpr[i].interBytes; + pCtx->interBufBytes = pQuery->pExpr1[i].interBytes; pCtx->numOfParams = pSqlFuncMsg->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -1672,7 +1672,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - int32_t f = pQuery->pSelectExpr[0].base.functionId; + int32_t f = pQuery->pExpr1[0].base.functionId; assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); pCtx->param[2].i64Key = order; @@ -1685,7 +1685,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order if (i > 0) { pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes; - pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pSelectExpr[i - 1].interBytes; + pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pExpr1[i - 1].interBytes; } } @@ -1779,7 +1779,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base; + SSqlFuncMsg *pExprMsg = &pQuery->pExpr1[i].base; // ignore the ts_comp function if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 && @@ -1802,7 +1802,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { // todo refactor with isLastRowQuery static bool isPointInterpoQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionID = pQuery->pSelectExpr[i].base.functionId; + int32_t functionID = pQuery->pExpr1[i].base.functionId; if (functionID == TSDB_FUNC_INTERP) { return true; } @@ -1814,7 +1814,7 @@ static bool isPointInterpoQuery(SQuery *pQuery) { // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION static bool isSumAvgRateQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TS) { continue; } @@ -1830,7 +1830,7 @@ static bool isSumAvgRateQuery(SQuery *pQuery) { static bool isFirstLastRowQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionID = pQuery->pSelectExpr[i].base.functionId; + int32_t functionID = pQuery->pExpr1[i].base.functionId; if (functionID == TSDB_FUNC_LAST_ROW) { return true; } @@ -1841,7 +1841,7 @@ static bool isFirstLastRowQuery(SQuery *pQuery) { static bool needReverseScan(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { continue; } @@ -1852,7 +1852,7 @@ static bool needReverseScan(SQuery *pQuery) { if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) { // the scan order to acquire the last result of the specified column - int32_t order = (int32_t)pQuery->pSelectExpr[i].base.arg->argValue.i64; + int32_t order = (int32_t)pQuery->pExpr1[i].base.arg->argValue.i64; if (order != pQuery->order.order) { return true; } @@ -1868,7 +1868,7 @@ static bool needReverseScan(SQuery *pQuery) { */ static bool onlyQueryTags(SQuery* pQuery) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; + SExprInfo* pExprInfo = &pQuery->pExpr1[i]; int32_t functionId = pExprInfo->base.functionId; @@ -1911,7 +1911,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { } else { bool hasMultioutput = false; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base; + SSqlFuncMsg *pExprMsg = &pQuery->pExpr1[i].base; if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) { continue; } @@ -1945,7 +1945,7 @@ bool colIdCheck(SQuery *pQuery) { // the scan order is not matter static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAG_DUMMY) { @@ -2175,7 +2175,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat if (pRuntimeEnv->topBotQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { return topbot_datablock_filter(&pCtx[i], functionId, (char *)&pDataStatis[i].min, (char *)&pDataStatis[i].max); } @@ -2266,7 +2266,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; + SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base; int32_t functionId = pSqlFunc->functionId; int32_t colId = pSqlFunc->colInfo.colId; @@ -2390,7 +2390,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t bytes = pQuery->pSelectExpr[i].bytes; + int32_t bytes = pQuery->pExpr1[i].bytes; assert(bytes > 0 && capacity > 0); char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage)); @@ -2421,7 +2421,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB int32_t newSize = (int32_t)(pRec->capacity + (pBlockInfo->rows - remain)); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t bytes = pQuery->pSelectExpr[i].bytes; + int32_t bytes = pQuery->pExpr1[i].bytes; assert(bytes > 0 && newSize > 0); char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); @@ -2435,7 +2435,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB // set the pCtx output buffer position pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes; - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } @@ -2599,7 +2599,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); - SExprInfo *pExprInfo = &pQuery->pSelectExpr[0]; + SExprInfo *pExprInfo = &pQuery->pExpr1[0]; if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { assert(pExprInfo->base.numOfParams == 1); @@ -2610,7 +2610,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { - SExprInfo* pLocalExprInfo = &pQuery->pSelectExpr[idx]; + SExprInfo* pLocalExprInfo = &pQuery->pExpr1[idx]; // ts_comp column required the tag value for join filter if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) { @@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (!mergeFlag) { pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes; pCtx[i].currentStage = FIRST_STAGE_MERGE; @@ -2680,7 +2680,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TAG_DUMMY) { continue; } @@ -2766,25 +2766,25 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { - switch (pQuery->pSelectExpr[i].type) { + switch (pQuery->pExpr1[i].type) { case TSDB_DATA_TYPE_BINARY: { - int32_t type = pQuery->pSelectExpr[i].type; - printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j, + int32_t type = pQuery->pExpr1[i].type; + printBinaryData(pQuery->pExpr1[i].base.functionId, pdata[i]->data + pQuery->pExpr1[i].bytes * j, type); break; } case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: - printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); + printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); break; case TSDB_DATA_TYPE_INT: - printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); + printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); break; case TSDB_DATA_TYPE_FLOAT: - printf("%f\t", *(float *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); + printf("%f\t", *(float *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); break; case TSDB_DATA_TYPE_DOUBLE: - printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); + printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j)); break; } } @@ -2951,7 +2951,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResu SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pQuery->pSelectExpr[j].base.functionId; + int32_t functionId = pQuery->pExpr1[j].base.functionId; /* * ts, tag, tagprj function can not decide the output number of current query @@ -3236,7 +3236,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowR // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functId = pQuery->pSelectExpr[j].base.functionId; + int32_t functId = pQuery->pExpr1[j].base.functionId; SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, j); if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || @@ -3260,7 +3260,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor - int32_t functId = pQuery->pSelectExpr[j].base.functionId; + int32_t functId = pQuery->pExpr1[j].base.functionId; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; if (pCtx->resultInfo == NULL) { @@ -3331,12 +3331,12 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->resultInfo = pCellInfo; // set the timestamp output buffer for top/bottom/diff query - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pSelectExpr[i].bytes * pQuery->rec.capacity)); + memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pQuery->rec.capacity)); } initCtxOutputBuf(pRuntimeEnv); @@ -3347,7 +3347,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { // reset the execution contexts for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pQuery->pSelectExpr[j].base.functionId; + int32_t functionId = pQuery->pExpr1[j].base.functionId; assert(functionId != TSDB_FUNC_DIFF); // set next output position @@ -3374,7 +3374,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pQuery->pSelectExpr[j].base.functionId; + int32_t functionId = pQuery->pExpr1[j].base.functionId; pRuntimeEnv->pCtx[j].currentStage = 0; SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); @@ -3412,7 +3412,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { 0, pQuery->rec.rows); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pQuery->rec.rows * bytes)); @@ -3454,7 +3454,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { setResultOutputBuf(pRuntimeEnv, pResult); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int16_t functId = pQuery->pSelectExpr[j].base.functionId; + int16_t functId = pQuery->pExpr1[j].base.functionId; if (functId == TSDB_FUNC_TS) { continue; } @@ -3467,7 +3467,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { } } else { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int16_t functId = pQuery->pSelectExpr[j].base.functionId; + int16_t functId = pQuery->pExpr1[j].base.functionId; if (functId == TSDB_FUNC_TS) { continue; } @@ -3680,7 +3680,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { setResultOutputBuf(pRuntimeEnv, buf); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); + aAggs[pQuery->pExpr1[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); } /* @@ -3692,14 +3692,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { } else { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); + aAggs[pQuery->pExpr1[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); } } } static bool hasMainOutput(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) { return true; @@ -3798,7 +3798,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult, page); - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } @@ -3941,7 +3941,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { bool requireTimestamp(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; i++) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + int32_t functionId = pQuery->pExpr1[i].base.functionId; if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) { return true; } @@ -4130,14 +4130,27 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { return false; } +static int16_t getNumOfFinalResCol(SQuery* pQuery) { + return pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2; +} + static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { - int32_t bytes = pQuery->pSelectExpr[col].bytes; + if (pQuery->pExpr2 == NULL) { + for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { + int32_t bytes = pQuery->pExpr1[col].bytes; + + memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); + data += bytes * numOfRows; + } + } else { + for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) { + int32_t bytes = pQuery->pExpr2[col].bytes; - memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); - data += bytes * numOfRows; + memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); + data += bytes * numOfRows; + } } int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo); @@ -4187,10 +4200,9 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int ret -= (int32_t)pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. - // todo refactor move to the beginning of buffer for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].bytes * pQuery->limit.offset, - ret * pQuery->pSelectExpr[i].bytes); + memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset, + ret * pQuery->pExpr1[i].bytes); } pQuery->limit.offset = 0; @@ -4310,6 +4322,56 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } +static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + + assert(pQuery->limit.offset == 0); + STimeWindow tw = *win; + getNextTimeWindow(pQuery, &tw); + + if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { + + // load the data block and check data remaining in current data block + // TODO optimize performance + SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); + SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); + + tw = *win; + int32_t startPos = + getNextQualifiedWindow(pRuntimeEnv, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1); + assert(startPos >= 0); + + // set the abort info + pQuery->pos = startPos; + + // reset the query start timestamp + pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos]; + pQuery->window.skey = pTableQueryInfo->win.skey; + TSKEY key = pTableQueryInfo->win.skey; + + pWindowResInfo->prevSKey = tw.skey; + int32_t index = pRuntimeEnv->windowResInfo.curIndex; + + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); + pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index + + qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, + GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, + pQuery->current->lastKey); + + return key; + } else { // do nothing + pQuery->window.skey = tw.skey; + pWindowResInfo->prevSKey = tw.skey; + + return tw.skey; + } + + return true; +} + static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { SQuery *pQuery = pRuntimeEnv->pQuery; *start = pQuery->current->lastKey; @@ -4358,49 +4420,13 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { (win.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { pQuery->limit.offset -= 1; pWindowResInfo->prevSKey = win.skey; - - getNextTimeWindow(pQuery, &tw); - } else { // current window does not ended in current data block, try next data block - getNextTimeWindow(pQuery, &tw); } + // current window does not ended in current data block, try next data block + getNextTimeWindow(pQuery, &tw); if (pQuery->limit.offset == 0) { - if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { - // load the data block and check data remaining in current data block - // TODO optimize performance - SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); - SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); - - tw = win; - int32_t startPos = - getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); - assert(startPos >= 0); - - // set the abort info - pQuery->pos = startPos; - - // reset the query start timestamp - pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos]; - pQuery->window.skey = pTableQueryInfo->win.skey; - *start = pTableQueryInfo->win.skey; - - pWindowResInfo->prevSKey = tw.skey; - int32_t index = pRuntimeEnv->windowResInfo.curIndex; - - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); - pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index - - qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, - GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); - - return true; - } else { // do nothing - *start = tw.skey; - pQuery->window.skey = tw.skey; - pWindowResInfo->prevSKey = tw.skey; - return true; - } + *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo); + return true; } /* @@ -4421,42 +4447,8 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { } if (pQuery->limit.offset == 0) { - if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { - // load the data block and check data remaining in current data block - // TODO optimize performance - SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); - SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); - - tw = win; - int32_t startPos = - getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); - assert(startPos >= 0); - - // set the abort info - pQuery->pos = startPos; - - // reset the query start timestamp - pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos]; - pQuery->window.skey = pTableQueryInfo->win.skey; - *start = pTableQueryInfo->win.skey; - - pWindowResInfo->prevSKey = tw.skey; - int32_t index = pRuntimeEnv->windowResInfo.curIndex; - - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); - pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index - - qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, - GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); - - return true; - } else { // do nothing - *start = tw.skey; - pQuery->window.skey = tw.skey; - pWindowResInfo->prevSKey = tw.skey; - return true; - } + *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo); + return true; } else { tw = win; int32_t startPos = @@ -4549,8 +4541,8 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) return terrno; } -static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { - int32_t numOfCols = pQuery->numOfOutput; +static SFillColInfo* createFillColInfo(SQuery* pQuery) { + int32_t numOfCols = getNumOfFinalResCol(pQuery); int32_t offset = 0; SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); @@ -4558,8 +4550,9 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { return NULL; } + // TODO refactor for(int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; + SExprInfo* pExprInfo = (pQuery->pExpr2 == NULL)? &pQuery->pExpr1[i]:&pQuery->pExpr2[i]; pFillCol[i].col.bytes = pExprInfo->bytes; pFillCol[i].col.type = (int8_t)pExprInfo->type; @@ -4664,14 +4657,15 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo } if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { - SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery); + SFillColInfo* pColInfo = createFillColInfo(pQuery); STimeWindow w = TSWINDOW_INITIALIZER; TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey); TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey); getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w); - pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, pQuery->numOfOutput, + int32_t numOfCols = getNumOfFinalResCol(pQuery); + pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, numOfCols, pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision, pQuery->fillType, pColInfo, pQInfo); } @@ -5324,6 +5318,74 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } + +static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { + SArithmeticSupport *pSupport = (SArithmeticSupport *) param; + SExprInfo* pExprInfo = (SExprInfo*) pSupport->exprList; + + int32_t index = -1; + for (int32_t i = 0; i < pSupport->numOfCols; ++i) { + if (colId == pExprInfo[i].base.resColId) { + index = i; + break; + } + } + + assert(index >= 0 && index < pSupport->numOfCols); + return pSupport->data[index] + pSupport->offset * pExprInfo[index].bytes; +} + +static void doSecondaryArithmeticProcess(SQuery* pQuery) { + SArithmeticSupport arithSup = {0}; + + tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES); + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + int32_t bytes = pQuery->pExpr2[i].bytes; + data[i] = (tFilePage *)malloc(bytes * pQuery->rec.rows + sizeof(tFilePage)); + } + + arithSup.offset = 0; + arithSup.numOfCols = (int32_t)pQuery->numOfOutput; + arithSup.exprList = pQuery->pExpr1; + arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); + + for (int32_t k = 0; k < arithSup.numOfCols; ++k) { + arithSup.data[k] = pQuery->sdata[k]->data; + } + + for (int i = 0; i < pQuery->numOfExpr2; ++i) { + SExprInfo *pExpr = &pQuery->pExpr2[i]; + + // calculate the result from several other columns + SSqlFuncMsg* pSqlFunc = &pExpr->base; + if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) { + + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + if (pSqlFunc->functionId == pQuery->pExpr1[j].base.functionId && + pSqlFunc->colInfo.colId == pQuery->pExpr1[j].base.colInfo.colId) { + memcpy(data[i]->data, pQuery->sdata[j]->data, pQuery->pExpr1[j].bytes * pQuery->rec.rows); + break; + } + } + } else { + arithSup.pArithExpr = pExpr; + tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, TSDB_ORDER_ASC, + getArithemicInputSrc); + } + } + + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + memcpy(pQuery->sdata[i]->data, data[i]->data, pQuery->pExpr2[i].bytes * pQuery->rec.rows); + } + + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + tfree(data[i]); + } + + tfree(data); + tfree(arithSup.data); +} + /* * in each query, this function will be called only once, no retry for further result. * @@ -5343,13 +5405,14 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); finalizeQueryResult(pRuntimeEnv); + // since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously. + pQuery->rec.rows = getNumOfResult(pRuntimeEnv); + doSecondaryArithmeticProcess(pQuery); + if (IS_QUERY_KILLED(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - // since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously. - pQuery->rec.rows = getNumOfResult(pRuntimeEnv); - skipResults(pRuntimeEnv); limitResults(pRuntimeEnv); } @@ -5469,8 +5532,15 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } + // no result generated, abort + if (pQuery->rec.rows == 0) { + break; + } + + doSecondaryArithmeticProcess(pQuery); + // the offset is handled at prepare stage if no interpolation involved - if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) { + if (pQuery->fillType == TSDB_FILL_NONE) { limitResults(pRuntimeEnv); break; } else { @@ -5693,7 +5763,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p * @param pExpr * @return */ -static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, +static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, SSqlFuncMsg ***pSecStageExpr, char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) { int32_t code = TSDB_CODE_SUCCESS; @@ -5724,6 +5794,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags); + pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput); // query msg safety check if (!validateQueryMsg(pQueryMsg)) { @@ -5793,9 +5864,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); - pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - pExprMsg->functionId = htons(pExprMsg->functionId); - pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->functionId = htons(pExprMsg->functionId); + pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + pExprMsg->resColId = htons(pExprMsg->resColId); pMsg += sizeof(SSqlFuncMsg); @@ -5831,6 +5903,49 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, goto _cleanup; } + if (pQueryMsg->secondStageOutput) { + pExprMsg = (SSqlFuncMsg *)pMsg; + *pSecStageExpr = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES); + + for (int32_t i = 0; i < pQueryMsg->secondStageOutput; ++i) { + (*pSecStageExpr)[i] = pExprMsg; + + pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); + pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->functionId = htons(pExprMsg->functionId); + pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + + pMsg += sizeof(SSqlFuncMsg); + + for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { + pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); + pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes); + + if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { + pExprMsg->arg[j].argValue.pz = pMsg; + pMsg += pExprMsg->arg[j].argBytes; // one more for the string terminated char. + } else { + pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64); + } + } + + int16_t functionId = pExprMsg->functionId; + if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) { + if (!TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { // ignore the column index check for arithmetic expression. + code = TSDB_CODE_QRY_INVALID_MSG; + goto _cleanup; + } + } else { +// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) { +// return TSDB_CODE_QRY_INVALID_MSG; +// } + } + + pExprMsg = (SSqlFuncMsg *)pMsg; + } + } + pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList); if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns @@ -5936,8 +6051,8 @@ _cleanup: return code; } -static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { - qDebug("qmsg:%p create arithmetic expr from binary string: %s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz); +static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { + qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); tExprNode* pExprNode = NULL; TRY(TSDB_MAX_TAG_CONDITIONS) { @@ -5957,7 +6072,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable return TSDB_CODE_SUCCESS; } -static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, +static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -5970,7 +6085,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); int16_t tagLen = 0; - for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { + for (int32_t i = 0; i < numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; pExprs[i].bytes = 0; @@ -5979,7 +6094,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * // parse the arithmetic expression if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) { - code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg); + code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg); if (code != TSDB_CODE_SUCCESS) { tfree(pExprs); @@ -6032,7 +6147,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * } // TODO refactor - for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { + for (int32_t i = 0; i < numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; int16_t functId = pExprs[i].base.functionId; @@ -6172,10 +6287,10 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { } static void doUpdateExprColumnIndex(SQuery *pQuery) { - assert(pQuery->pSelectExpr != NULL && pQuery != NULL); + assert(pQuery->pExpr1 != NULL && pQuery != NULL); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].base; + SSqlFuncMsg *pSqlExprMsg = &pQuery->pExpr1[k].base; if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM) { continue; } @@ -6230,7 +6345,7 @@ static void calResultBufSize(SQuery* pQuery) { } static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) { + SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) { int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; @@ -6256,7 +6371,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->limit.offset = pQueryMsg->offset; pQuery->order.order = pQueryMsg->order; pQuery->order.orderColId = pQueryMsg->orderColId; - pQuery->pSelectExpr = pExprs; + pQuery->pExpr1 = pExprs; + pQuery->pExpr2 = pSecExprs; + pQuery->numOfExpr2 = pQueryMsg->secondStageOutput; pQuery->pGroupbyExpr = pGroupbyExpr; memcpy(&pQuery->interval, &pQueryMsg->interval, sizeof(pQuery->interval)); pQuery->fillType = pQueryMsg->fillType; @@ -6296,7 +6413,15 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { // allocate additional memory for interResults that are usually larger then final results - size_t size = (size_t)((pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage)); + // TODO refactor + int16_t bytes = 0; + if (pQuery->pExpr2 == NULL || col > pQuery->numOfExpr2) { + bytes = pExprs[col].bytes; + } else { + bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes); + } + + size_t size = (size_t)((pQuery->rec.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage)); pQuery->sdata[col] = (tFilePage *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _cleanup; @@ -6509,6 +6634,22 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { pTableqinfoGroupInfo->numOfTables = 0; } +static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) { + if (pExprInfo == NULL) { + assert(numOfExpr == 0); + return NULL; + } + + for (int32_t i = 0; i < numOfExpr; ++i) { + if (pExprInfo[i].pExpr != NULL) { + tExprNodeDestroy(pExprInfo[i].pExpr, NULL); + } + } + + tfree(pExprInfo); + return NULL; +} + static void freeQInfo(SQInfo *pQInfo) { if (!isValidQInfo(pQInfo)) { return; @@ -6540,22 +6681,8 @@ static void freeQInfo(SQInfo *pQInfo) { } } - if (pQuery->pSelectExpr != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SExprInfo *pExprInfo = &pQuery->pSelectExpr[i]; - - if (pExprInfo->pExpr != NULL) { - tExprTreeDestroy(&pExprInfo->pExpr, NULL); - } - } - - tfree(pQuery->pSelectExpr); - } - - if (pQuery->pGroupbyExpr != NULL) { - taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); - tfree(pQuery->pGroupbyExpr); - } + pQuery->pExpr1 = destroyQueryFuncExpr(pQuery->pExpr1, pQuery->numOfOutput); + pQuery->pExpr2 = destroyQueryFuncExpr(pQuery->pExpr2, pQuery->numOfExpr2); tfree(pQuery->tagColList); tfree(pQuery->pFilterInfo); @@ -6568,6 +6695,11 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->colList); } + if (pQuery->pGroupbyExpr != NULL) { + taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); + tfree(pQuery->pGroupbyExpr); + } + tfree(pQuery); } @@ -6670,16 +6802,19 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi int32_t code = TSDB_CODE_SUCCESS; - char *tagCond = NULL; - char *tbnameCond = NULL; + char *tagCond = NULL; + char *tbnameCond = NULL; SArray *pTableIdList = NULL; - SSqlFuncMsg **pExprMsg = NULL; - SExprInfo *pExprs = NULL; + SSqlFuncMsg **pExprMsg = NULL; + SSqlFuncMsg **pSecExprMsg = NULL; + SExprInfo *pExprs = NULL; + SExprInfo *pSecExprs = NULL; + SColIndex *pGroupColIndex = NULL; SColumnInfo *pTagColumnInfo = NULL; SSqlGroupbyExpr *pGroupbyExpr = NULL; - code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo); + code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &pSecExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo); if (code != TSDB_CODE_SUCCESS) { goto _over; } @@ -6696,10 +6831,16 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { + if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { goto _over; } + if (pSecExprMsg != NULL) { + if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &pSecExprs, pSecExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { + goto _over; + } + } + pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code); if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { goto _over; @@ -6757,8 +6898,10 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pSecExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery); + pExprs = NULL; + pSecExprs = NULL; pGroupbyExpr = NULL; pTagColumnInfo = NULL; @@ -6773,13 +6916,19 @@ _over: free(tagCond); free(tbnameCond); free(pGroupColIndex); + if (pGroupbyExpr != NULL) { taosArrayDestroy(pGroupbyExpr->columnInfo); free(pGroupbyExpr); } + free(pTagColumnInfo); free(pExprs); + free(pSecExprs); + free(pExprMsg); + free(pSecExprMsg); + taosArrayDestroy(pTableIdList); for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) { @@ -7043,11 +7192,11 @@ static void buildTagQueryResult(SQInfo* pQInfo) { assert(num == pQInfo->tableqinfoGroupInfo.numOfTables); int32_t count = 0; - int32_t functionId = pQuery->pSelectExpr[0].base.functionId; + int32_t functionId = pQuery->pExpr1[0].base.functionId; if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id assert(pQuery->numOfOutput == 1); - SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; + SExprInfo* pExprInfo = &pQuery->pExpr1[0]; int32_t rsize = pExprInfo->bytes; count = 0; @@ -7121,7 +7270,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { continue; } - SExprInfo* pExprInfo = pQuery->pSelectExpr; + SExprInfo* pExprInfo = pQuery->pExpr1; STableQueryInfo* item = taosArrayGetP(pa, i); char *data = NULL, *dst = NULL; diff --git a/src/query/src/qParserImpl.c b/src/query/src/qParserImpl.c index 9edcdb008308711ce59f3d2642ab9d550165adcc..4481ff5e8fbbf0244ec664e4c699e4747ebd7e92 100644 --- a/src/query/src/qParserImpl.c +++ b/src/query/src/qParserImpl.c @@ -225,7 +225,7 @@ tSQLExpr *tSQLExprCreate(tSQLExpr *pLeft, tSQLExpr *pRight, int32_t optrType) { tSQLExprDestroy(pLeft); tSQLExprDestroy(pRight); - } else if (pLeft->nSQLOptr == TK_FLOAT || pRight->nSQLOptr == TK_FLOAT) { + } else if ((pLeft->nSQLOptr == TK_FLOAT && pRight->nSQLOptr == TK_INTEGER) || (pLeft->nSQLOptr == TK_INTEGER && pRight->nSQLOptr == TK_FLOAT)) { pExpr->val.nType = TSDB_DATA_TYPE_DOUBLE; pExpr->nSQLOptr = TK_FLOAT; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index f057dc1a497f63b6a91aa69c83a531230c2ae8d4..f4f89a8709a16d0d84ff8333cc6ef3d4463b6c64 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -24,7 +24,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t size = 0; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - size += pQuery->pSelectExpr[i].interBytes; + size += pQuery->pExpr1[i].interBytes; } assert(size >= 0); @@ -237,7 +237,7 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) { SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); - size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; + size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes; memset(s, 0, size); RESET_RESULT_INFO(pResultInfo); @@ -280,7 +280,7 @@ void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResult tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId); char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage); - size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; + size_t s = pRuntimeEnv->pQuery->pExpr1[i].bytes; memcpy(dstBuf, srcBuf, s); } diff --git a/tests/script/general/parser/fill.sim b/tests/script/general/parser/fill.sim index 92aa6a922cc901c88ad68f87f61e8befbed5e87c..0547db77498cf1d61baf44deb286d9045a469dc3 100644 --- a/tests/script/general/parser/fill.sim +++ b/tests/script/general/parser/fill.sim @@ -851,11 +851,205 @@ endi print =====================>td-1442 sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev); -print =============== clear -sql drop database $db -sql show databases -if $rows != 0 then +print =====================> aggregation + arithmetic + fill +#sql select avg(cpu_taosd) - first(cpu_taosd) from dn1 where ts<'2020-11-13 11:00:00' and ts>'2020-11-13 10:50:00' interval(10s) fill(value, 99) +#sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(value, 99); +#sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(NULL); + +print =====================> td-2060 +sql create table m1 (ts timestamp, k int ) tags(a int); +sql create table if not exists tm0 using m1 tags(1); +sql insert into tm0 values('2020-1-1 1:1:1', 1); +sql insert into tm0 values('2020-1-1 1:1:2', 2); +sql insert into tm0 values('2020-1-1 1:1:3', 3); +sql insert into tm0 values('2020-1-1 1:2:4', 4); +sql insert into tm0 values('2020-1-1 1:2:5', 5); +sql insert into tm0 values('2020-1-1 1:2:6', 6); +sql insert into tm0 values('2020-1-1 1:3:7', 7); +sql insert into tm0 values('2020-1-1 1:3:8', 8); +sql insert into tm0 values('2020-1-1 1:3:9', 9); +sql insert into tm0 values('2020-1-1 1:4:10', 10); + +sql select max(k)-min(k),last(k)-first(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:2:15' interval(10s) fill(value, 99,91,90,89,88,87,86,85); +if $rows != 8 then + return -1 +endi + +if $data00 != @20-01-01 01:01:00.000@ then + return -1 +endi + +if $data01 != 2.000000000 then + return -1 +endi + +if $data02 != 2.000000000 then + return -1 +endi + +if $data03 != -2.000000000 then + return -1 +endi + +if $data10 != @20-01-01 01:01:10.000@ then + return -1 +endi + +if $data11 != 99.000000000 then + return -1 +endi + +if $data12 != 91.000000000 then + return -1 +endi + +if $data13 != 90.000000000 then + return -1 +endi + +if $data60 != @20-01-01 01:02:00.000@ then + return -1 +endi + +if $data61 != 2.000000000 then + return -1 +endi + +if $data62 != 2.000000000 then + return -1 +endi + +if $data63 != -2.000000000 then + return -1 +endi + +if $data70 != @20-01-01 01:02:10.000@ then + return -1 +endi + +if $data71 != 99.000000000 then + return -1 +endi + +if $data72 != 91.000000000 then + return -1 +endi + +if $data73 != 90.000000000 then + return -1 +endi + +sql select first(k)-avg(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:2:15' interval(10s) fill(NULL); +if $rows != 8 then + return -1 +endi + +if $data00 != @20-01-01 01:01:00.000@ then + return -1 +endi + +if $data01 != -1.000000000 then + return -1 +endi + +if $data02 != -2.000000000 then + return -1 +endi + +if $data10 != @20-01-01 01:01:10.000@ then + return -1 +endi + +if $data11 != NULL then + return -1 +endi + +if $data12 != NULL then + return -1 +endi + +sql select max(k)-min(k),last(k)-first(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 4:2:15' interval(500a) fill(value, 99,91,90,89,88,87,86,85) order by ts asc; +if $rows != 21749 then + return -1 +endi + +sql select max(k)-min(k),last(k)-first(k),0-spread(k),count(1) from m1 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:2:15' interval(10s) fill(value, 99,91,90,89,88,87,86,85) order by ts asc; +if $rows != 8 then + return -1 +endi + +if $data00 != @20-01-01 01:01:00.000@ then + return -1 +endi + +if $data00 != @20-01-01 01:01:00.000@ then + return -1 +endi +if $data1 +if $data01 != 2.000000000 then + return -1 +endi + +if $data02 != 2.000000000 then + return -1 +endi + +if $data03 != -2.000000000 then + return -1 +endi + +if $data04 != 3 then + return -1 +endi + +if $data10 != @20-01-01 01:01:10.000@ then + return -1 +endi + +if $data11 != 99.000000000 then + return -1 +endi + +if $data12 != 91.000000000 then + return -1 +endi + +if $data13 != 90.000000000 then + return -1 +endi + +if $data14 != 89 then + return -1 +endi + +print ==================> td-2115 +sql select count(*), min(c3)-max(c3) from m_fl_mt0 group by tgcol +if $rows != 10 then + return -1 +endi + +if $data00 != 5 then + return -1 +endi + +if $data01 != -4.000000000 then + return -1 +endi + +if $data02 != 0 then + return -1 +endi + +if $data12 != 1 then return -1 endi + +print =============== clear +#sql drop database $db +#sql show databases +#if $rows != 0 then +# return -1 +#endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 76230f79f0136975006e81cee4236a2fa9521214..a5e71260b487ddd87a6df795c2f5bdfb9d72eca3 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -212,4 +212,157 @@ sql select count(join_mt0.c1), first(join_mt0.c1)/count(*), first(join_mt1.c9) f sql select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt0.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); sql select last(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); +sql create database disorder_db; +sql use disorder_db; +sql create table m1(ts timestamp, k int) tags(a int); +sql create table tm0 using m1 tags(0); +sql create table tm1 using m1 tags(1); +sql create table tm2 using m1 tags(2); +sql create table tm3 using m1 tags(3); +sql create table tm4 using m1 tags(4); +sql create table tm5 using m1 tags(5); +sql create table tm6 using m1 tags(6); +sql create table tm7 using m1 tags(7); + +sql show vgroups +if $rows != 2 then + print maxTablesPerVnode set to 4 is not active. + return -1 +endi + +sql insert into tm0 values('2020-1-1 1:1:1', 0); +sql insert into tm1 values('2020-1-1 1:1:1', 1); +sql insert into tm2 values('2020-1-1 1:1:1', 2); +sql insert into tm3 values('2020-1-1 1:1:1', 3); +sql insert into tm4 values('2020-1-1 1:1:1', 4); +sql insert into tm5 values('2020-1-1 1:1:1', 5); +sql insert into tm6 values('2020-1-1 1:1:1', 6); +sql insert into tm7 values('2020-1-1 1:1:1', 7); + +sql create table m2(ts timestamp, k int) tags(b int); +sql create table t0 using m2 tags(0); +sql create table t1 using m2 tags(4); +sql create table t2 using m2 tags(92); +sql create table t3 using m2 tags(93); +sql create table t4 using m2 tags(1); +sql create table t5 using m2 tags(5); +sql create table t6 using m2 tags(96); +sql create table t7 using m2 tags(97); + +sql show vgroups +if $rows != 4 then + return -1 +endi + +sql insert into t0 values('2020-1-1 1:1:1', 10); +sql insert into t1 values('2020-1-1 1:1:1', 11); +sql insert into t2 values('2020-1-1 1:1:1', 12); +sql insert into t3 values('2020-1-1 1:1:1', 13); +sql insert into t4 values('2020-1-1 1:1:1', 14); +sql insert into t5 values('2020-1-1 1:1:1', 15); +sql insert into t6 values('2020-1-1 1:1:1', 16); +sql insert into t7 values('2020-1-1 1:1:1', 17); + +sql select m1.ts,m1.tbname,m1.a, m2.ts,m2.tbname,m2.b from m1,m2 where m1.a=m2.b and m1.ts=m2.ts; +if $rows != 4 then + return -1 +endi + +if $data00 != @20-01-01 01:01:01.000@ then + print expect 20-01-01 01:01:01.000, actual:$data00 + return -1 +endi + +if $data01 != @tm0@ then + return -1 +endi + +if $data02 != 0 then + return -1 +endi + +if $data03 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data04 != @t0@ then + return -1 +endi + +if $data05 != 0 then + return -1 +endi + +if $data10 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data11 != @tm1@ then + return -1 +endi + +if $data12 != 1 then + return -1 +endi + +if $data13 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data14 != @t4@ then + return -1 +endi + +if $data15 != 1 then + return -1 +endi + +if $data20 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data21 != @tm4@ then + return -1 +endi + +if $data22 != 4 then + return -1 +endi + +if $data23 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data24 != @t1@ then + return -1 +endi + +if $data25 != 4 then + return -1 +endi + +if $data30 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data31 != @tm5@ then + return -1 +endi + +if $data32 != 5 then + return -1 +endi + +if $data33 != @20-01-01 01:01:01.000@ then + return -1 +endi + +if $data34 != @t5@ then + return -1 +endi + +if $data35 != 5 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file