diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3918b15ad421d3bcc367301093c674033cfd79a1..2f9a18b9fc7c18c7f467e8b550f8e72dc96a55e9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -251,7 +251,13 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); +typedef struct SColumnList { + int32_t num; + SColumnIndex ids[TSDB_MAX_COLUMNS]; +} SColumnList; +int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, + int8_t type, char* fieldName, SSqlExpr* pSqlExpr); #ifdef __cplusplus } #endif diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index afd8e98edac4522089e0aaafa02237da67bdd1cc..3d2f7949f251330372480b9dfb74eded9bd5967c 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -685,9 +685,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols); + pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->exprsInfo.numOfExprs); - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); int32_t tableIndexOfSub = -1; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 3f9b668d707a93fc3b843c4b936ef4e4dca44dd5..3b4fc0240cf854eb72e5683db311c58a1e220d96 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -38,11 +38,6 @@ #define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX)) #define TBNAME_LIST_SEP "," -typedef struct SColumnList { - int32_t num; - SColumnIndex ids[TSDB_MAX_COLUMNS]; -} SColumnList; - static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIdx, int32_t tableIndex); static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); @@ -60,8 +55,6 @@ static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t na static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn); -static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, - int8_t type, char* fieldName, SSqlExpr* pSqlExpr); static int32_t changeFunctionID(int32_t optr, int16_t* functionId); static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index de03375246cec89bcf1ee76eb653dcb66a09c39b..c4a8440ef60d14bb335a94d5dd8ac7830016ac42 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -694,13 +694,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); tscPrintSelectClause(pNew, 0); - - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); - tscPrintSelectClause(pNew, 0); } else { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5023534f659de76ec1b3d4edcdc64e2206f547ef..99bc9632deeff8bf98c8090e0fac7604104f84c9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -74,7 +74,10 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const tscMgmtIpList.ip[2] = inet_addr(tsMasterIp); strcpy(tscMgmtIpList.ipstr[3], tsSecondIp); tscMgmtIpList.ip[3] = inet_addr(tsSecondIp); - strcpy(tsMasterIp, ip); + + if (tsMasterIp != ip) { + strcpy(tsMasterIp, ip); + } } pObj = (STscObj *)malloc(sizeof(STscObj)); @@ -416,7 +419,7 @@ static char *getArithemicInputSrc(void *param, char *name, int32_t colId) { return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index]; } -static void **doSetResultRowData(SSqlObj *pSql) { +static void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -429,7 +432,6 @@ static void **doSetResultRowData(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - int32_t num = 0; for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) { SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i]; @@ -444,7 +446,7 @@ static void **doSetResultRowData(SSqlObj *pSql) { TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); transferNcharData(pSql, i, pField); - // calculate the result from serveral other columns + // calculate the result from several other columns if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) { SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); sas->offset = 0; @@ -471,8 +473,6 @@ static void **doSetResultRowData(SSqlObj *pSql) { } } - assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols); - pRes->row++; // index increase one-step return pRes->tsrow; } @@ -536,9 +536,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { while (1) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - if (pRes->tsrow == NULL) { - pRes->tsrow = calloc(pQueryInfo->exprsInfo.numOfExprs, POINTER_BYTES); - } + tscCreateResPointerInfo(pRes, pQueryInfo); bool success = false; @@ -550,10 +548,8 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { } if (numOfTableHasRes >= 2) { // do merge result - - success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); - // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; + success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && + (doSetResultRowData(pSql->pSubs[1], false) != NULL); // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); } else { // only one subquery SSqlObj *pSub = pSql->pSubs[0]; @@ -561,7 +557,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { pSub = pSql->pSubs[1]; } - success = (doSetResultRowData(pSub) != NULL); + success = (doSetResultRowData(pSub, false) != NULL); } if (success) { // current row of final output has been built, return to app @@ -572,6 +568,41 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { SSqlRes *pRes1 = &pSql->pSubs[tableIndex]->res; pRes->tsrow[i] = pRes1->tsrow[columnIndex]; } + + int32_t numOfOutputCols = tscNumOfFields(pQueryInfo); + assert(pRes->numOfCols >= numOfOutputCols); + + for(int32_t i = 0; i < numOfOutputCols; ++i) { + if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) { + continue; // no arithmetic expression exists, continue + } + + assert(pQueryInfo->fieldsInfo.pExpr[i] != NULL); + SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); + sas->offset = 0; + sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i]; + + sas->numOfCols = sas->pExpr->binExprInfo.numOfCols; + + if (pRes->buffer[i] == NULL) { + pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes); + } + + for (int32_t k = 0; k < sas->numOfCols; ++k) { + int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf; + assert(columnIndex < pQueryInfo->exprsInfo.numOfExprs); + + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, columnIndex); + + sas->elemSize[k] = pExpr->resBytes; + sas->data[k] = pRes->tsrow[columnIndex]; + } + + tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, + getArithemicInputSrc); + pRes->tsrow[i] = pRes->buffer[i]; + free(sas); // todo optimization + } pRes->numOfTotalInCurrentClause++; @@ -662,7 +693,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { } } - return doSetResultRowData(pSql); + return doSetResultRowData(pSql, true); } TAOS_ROW taos_fetch_row(TAOS_RES *res) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 52505f9d24acba837ab5cfd7127ef613193617fd..1073eec6ea7de122d311620f058abfcf97f09bdb 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -350,11 +350,13 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) { int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pRes->tsrow == NULL) { - int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols; - pRes->numOfCols = numOfOutputCols; + int32_t numOfColumns = pQueryInfo->exprsInfo.numOfExprs; + assert(numOfColumns >= pQueryInfo->fieldsInfo.numOfOutputCols); + + pRes->numOfCols = numOfColumns; - pRes->tsrow = calloc(POINTER_BYTES, numOfOutputCols); - pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols); + pRes->tsrow = calloc(POINTER_BYTES, numOfColumns); + pRes->buffer = calloc(POINTER_BYTES, numOfColumns); // not enough memory if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { @@ -370,8 +372,8 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { } void tscDestroyResPointerInfo(SSqlRes* pRes) { + // free all buffers containing the multibyte string if (pRes->buffer != NULL) { - // free all buffers containing the multibyte string for (int i = 0; i < pRes->numOfCols; i++) { tfree(pRes->buffer[i]); } @@ -946,6 +948,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList tscFieldInfoSetValFromField(dst, i, &src->pFields[indexList[i]]); dst->pVisibleCols[i] = src->pVisibleCols[indexList[i]]; dst->pSqlExpr[i] = src->pSqlExpr[indexList[i]]; + dst->pExpr[i] = src->pExpr[indexList[i]]; } } } @@ -2004,8 +2007,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void indexList[j++] = i; } } + + // create the fields info from the sql functions + SColumnList columnList = {.num = 1}; + + for(int32_t k = 0; k < numOfOutputCols; ++k) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, indexList[k]); + columnList.ids[0] = (SColumnIndex){.tableIndex = tableIndex, .columnIndex = pExpr->colInfo.colIdx}; + insertResultField(pNewQueryInfo, k, &columnList, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr); + } - tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutputCols); free(indexList); // make sure the the sqlExpr for each fields is correct diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 77814f6b8a7a5c7c6535004be896bc4fac197134..4addadd72be27d0d45f8ed3477dd9306708f8458 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1776,6 +1776,11 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { + if ((pNextWin->ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (pNextWin->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + return -1; + } + getNextTimeWindow(pQuery, pNextWin); if (pWindowResInfo->startTime > pNextWin->skey || (pNextWin->skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||