提交 df3c4573 编写于 作者: H hjxilinx

fix bugs in regression tests.

上级 ae453f80
......@@ -154,7 +154,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid, bool deepcopy);
void* tscSqlExprDestroy(SSqlExpr* pExpr);
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
......
......@@ -96,9 +96,9 @@ typedef struct SFieldInfo {
} SFieldInfo;
typedef struct SSqlExprInfo {
int16_t numOfAlloc;
int16_t numOfExprs;
SSqlExpr *pExprs;
int16_t numOfAlloc;
int16_t numOfExprs;
SSqlExpr** pExprs;
} SSqlExprInfo;
typedef struct SColumnBase {
......@@ -283,12 +283,11 @@ typedef struct {
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int row;
int16_t numOfnchar;
int16_t numOfCols;
int16_t precision;
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
short * bytes;
void ** tsrow;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex *pColumnIndex;
......@@ -406,8 +405,6 @@ int taos_retrieve(TAOS_RES *res);
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
......
......@@ -300,6 +300,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
bool stableQueryFunctChanged(int32_t funcId) {
return (aAggs[funcId].stableFuncId != funcId);
}
/**
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
......@@ -3558,6 +3562,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
}
GET_RES_INFO(pCtx)->numOfRes = 1; // todo add test case
doFinalizer(pCtx);
}
/*
......
......@@ -275,6 +275,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj
assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns
taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
......@@ -299,18 +300,20 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0);
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid);
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
pSupporter->exprsInfo.numOfExprs = 0;
pSupporter->fieldsInfo.numOfOutputCols = 0;
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) {
if (pSupporter->exprsInfo.pExprs[0]->functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
}
// todo refactor function name
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
......
......@@ -251,6 +251,13 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 3, TSDB_DATA_TYPE_BINARY, "Note", noteColLength);
rowLen += noteColLength;
//set the sqlexpr part
SColumnIndex index = {0};
pQueryInfo->fieldsInfo.pSqlExpr[0] = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN, TSDB_COL_NAME_LEN);
pQueryInfo->fieldsInfo.pSqlExpr[1] = tscSqlExprInsert(pQueryInfo, 1, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength, typeColLength);
pQueryInfo->fieldsInfo.pSqlExpr[2] = tscSqlExprInsert(pQueryInfo, 2, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), sizeof(int32_t));
pQueryInfo->fieldsInfo.pSqlExpr[3] = tscSqlExprInsert(pQueryInfo, 3, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength, noteColLength);
return rowLen;
}
......@@ -455,6 +462,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
tscInitResObjForLocalQuery(pSql, 1, valueLength);
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0);
pQueryInfo->fieldsInfo.pSqlExpr[0] = pQueryInfo->exprsInfo.pExprs[0];
strncpy(pRes->data, val, pField->bytes);
}
......
......@@ -16,7 +16,6 @@
#define _XOPEN_SOURCE
#define _DEFAULT_SOURCE
#include <tast.h>
#include "os.h"
#include "taos.h"
#include "taosmsg.h"
......@@ -28,6 +27,7 @@
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tast.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
......@@ -130,7 +130,7 @@ static int32_t tscQueryOnlyMetricTags(SQueryInfo* pQueryInfo, bool* queryOnMetri
assert(QUERY_IS_STABLE_QUERY(pQueryInfo->type));
*queryOnMetricTags = true;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_TAGPRJ &&
......@@ -570,7 +570,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
* are available.
*/
static bool isTopBottomQuery(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
......@@ -623,7 +623,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
* check invalid SQL:
* select count(tbname)/count(tag1)/count(tag2) from super_table_name interval(1d);
*/
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -1330,7 +1330,8 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
SSchema* pSchema = tsGetColumnSchema(pMeterMeta, pIndex->columnIndex);
char* colName = (pItem->aliasName == NULL) ? pSchema->name : pItem->aliasName;
strncpy(pExpr->aliasName, colName, tListLen(pExpr->aliasName));
SColumnList ids = {0};
ids.num = 1;
ids.ids[0] = *pIndex;
......@@ -1339,7 +1340,7 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
ids.num = 0;
}
insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, colName, pExpr);
insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr);
}
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
......@@ -1375,6 +1376,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
for (int32_t j = 0; j < numOfTotalColumns; ++j) {
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex);
strncpy(pExpr->aliasName, pSchema[j].name, tListLen(pExpr->aliasName));
pIndex->columnIndex = j;
SColumnList ids = {0};
......@@ -1393,7 +1395,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
const char* msg0 = "invalid column name";
const char* msg1 = "tag for table query is not allowed";
int32_t startPos = pQueryInfo->fieldsInfo.numOfOutputCols;
int32_t startPos = pQueryInfo->exprsInfo.numOfExprs;
if (pItem->pNode->nSQLOptr == TK_ALL) { // project on all fields
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
......@@ -1471,7 +1473,8 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
}
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes);
strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName));
// for point interpolation/last_row query, we need the timestamp column to be loaded
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
......@@ -2222,8 +2225,6 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, k);
int16_t functionId = aAggs[pExpr->functionId].stableFuncId;
int32_t colIndex = pExpr->colInfo.colIdx;
......@@ -2257,14 +2258,27 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx);
if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
(pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) {
// if (/*(pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
// (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) ||
// pExpr->functionId == TSDB_FUNC_LAST_ROW*/) {
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int16_t inter = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, pExpr->functionId, 0, &pExpr->resType, &pExpr->resBytes,
int32_t functionId = pExpr->functionId;
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
continue;
}
if (functionId == TSDB_FUNC_FIRST_DST) {
functionId = TSDB_FUNC_FIRST;
} else if (functionId == TSDB_FUNC_LAST_DST) {
functionId = TSDB_FUNC_LAST;
}
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &pExpr->resType, &pExpr->resBytes,
&inter, 0, false);
}
// }
}
}
......@@ -2274,7 +2288,7 @@ bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo) {
const char* msg3 = "function not support for super table query";
// filter sql function not supported by metric query yet.
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_METRIC) == 0) {
invalidSqlErrMsg(pQueryInfo->msg, msg3);
......@@ -2311,7 +2325,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
for (int32_t i = startIdx + 1; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = startIdx + 1; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int16_t functionId = pExpr->functionId;
......@@ -3852,6 +3866,7 @@ int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t o
return TSDB_CODE_SUCCESS;
}
// todo error !!!!
int32_t tsRewriteFieldNameIfNecessary(SQueryInfo* pQueryInfo) {
const char rep[] = {'(', ')', '*', ',', '.', '/', '\\', '+', '-', '%', ' '};
......@@ -3898,7 +3913,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
}
if (pQueryInfo->defaultVal == NULL) {
pQueryInfo->defaultVal = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(int64_t));
pQueryInfo->defaultVal = calloc(pQueryInfo->exprsInfo.numOfExprs, sizeof(int64_t));
if (pQueryInfo->defaultVal == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
......@@ -3908,7 +3923,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
pQueryInfo->interpoType = TSDB_INTERPO_NONE;
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = START_INTERPO_COL_IDX; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i);
setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes);
}
......@@ -3930,12 +3945,12 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
if (tscIsPointInterpQuery(pQueryInfo)) {
startPos = 0;
if (numOfFillVal > pQueryInfo->fieldsInfo.numOfOutputCols) {
numOfFillVal = pQueryInfo->fieldsInfo.numOfOutputCols;
if (numOfFillVal > pQueryInfo->exprsInfo.numOfExprs) {
numOfFillVal = pQueryInfo->exprsInfo.numOfExprs;
}
} else {
numOfFillVal = (pFillToken->nExpr > pQueryInfo->fieldsInfo.numOfOutputCols)
? pQueryInfo->fieldsInfo.numOfOutputCols
numOfFillVal = (pFillToken->nExpr > pQueryInfo->exprsInfo.numOfExprs)
? pQueryInfo->exprsInfo.numOfExprs
: pFillToken->nExpr;
}
......@@ -3955,11 +3970,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
}
}
if ((pFillToken->nExpr < pQueryInfo->fieldsInfo.numOfOutputCols) ||
((pFillToken->nExpr - 1 < pQueryInfo->fieldsInfo.numOfOutputCols) && (tscIsPointInterpQuery(pQueryInfo)))) {
if ((pFillToken->nExpr < pQueryInfo->exprsInfo.numOfExprs) ||
((pFillToken->nExpr - 1 < pQueryInfo->exprsInfo.numOfExprs) && (tscIsPointInterpQuery(pQueryInfo)))) {
tVariantListItem* lastItem = &pFillToken->a[pFillToken->nExpr - 1];
for (int32_t i = numOfFillVal; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = numOfFillVal; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
......@@ -4363,7 +4378,7 @@ int32_t validateSqlFunctionInStreamSql(SQueryInfo* pQueryInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
}
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (!IS_STREAM_QUERY_VALID(aAggs[functId].nStatus)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -4378,13 +4393,13 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SQueryInfo* pQueryInfo) {
const char* msg1 = "column projection is not compatible with interval";
// multi-output set/ todo refactor
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
// projection query on primary timestamp, the selectivity function needs to be present.
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
bool hasSelectivity = false;
for (int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
SSqlExpr* pEx = tscSqlExprGet(pQueryInfo, j);
if ((aAggs[pEx->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) {
hasSelectivity = true;
......@@ -4645,7 +4660,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
}
// filter the query functions operating on "tbname" column that are not supported by normal columns.
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->colInfo.colIdx == TSDB_TBNAME_COLUMN_INDEX) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
......@@ -4783,13 +4798,13 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
int16_t bytes = pSchema[index.columnIndex].bytes;
char* name = pSchema[index.columnIndex].name;
pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, TSDB_FUNC_TAG, &index, type, bytes,
pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index, type, bytes,
bytes);
pExpr->colInfo.flag = TSDB_COL_TAG;
// NOTE: tag column does not add to source column list
SColumnList ids = {0};
insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name, pExpr);
insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, &ids, bytes, type, name, pExpr);
int32_t relIndex = index.columnIndex;
......@@ -4816,7 +4831,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, index);
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = index};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, TSDB_FUNC_PRJ, &colIndex,
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_PRJ, &colIndex,
pSchema->type, pSchema->bytes, pSchema->bytes);
pExpr->colInfo.flag = TSDB_COL_NORMAL;
......@@ -4827,14 +4842,14 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
list.num = 1;
list.ids[0] = colIndex;
insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &list, pSchema->bytes, pSchema->type,
insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs - 1, &list, pSchema->bytes, pSchema->type,
pSchema->name, pExpr);
tscFieldInfoUpdateVisible(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutputCols - 1, false);
tscFieldInfoUpdateVisible(&pQueryInfo->fieldsInfo, pQueryInfo->exprsInfo.numOfExprs - 1, false);
}
static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
int32_t tagLength = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAGPRJ || pExpr->functionId == TSDB_FUNC_TAG) {
pExpr->functionId = TSDB_FUNC_TAG_DUMMY;
......@@ -4848,7 +4863,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_TAG_DUMMY && pExpr->functionId != TSDB_FUNC_TS_DUMMY) {
SSchema* pColSchema = &pSchema[pExpr->colInfo.colIdx];
......@@ -4859,7 +4874,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
}
static void doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
bool qualifiedCol = false;
......@@ -4891,7 +4906,7 @@ static bool onlyTagPrjFunction(SQueryInfo* pQueryInfo) {
bool hasTagPrj = false;
bool hasColumnPrj = false;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
hasColumnPrj = true;
......@@ -4907,7 +4922,7 @@ static bool onlyTagPrjFunction(SQueryInfo* pQueryInfo) {
static bool allTagPrjInGroupby(SQueryInfo* pQueryInfo) {
bool allInGroupby = true;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_TAGPRJ) {
continue;
......@@ -4924,7 +4939,7 @@ static bool allTagPrjInGroupby(SQueryInfo* pQueryInfo) {
}
static void updateTagPrjFunction(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAGPRJ) {
pExpr->functionId = TSDB_FUNC_TAG;
......@@ -4946,7 +4961,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) {
int16_t numOfSelectivity = 0;
int16_t numOfAggregation = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAGPRJ ||
(pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
......@@ -4955,7 +4970,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) {
}
}
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int16_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_ARITHM) {
......@@ -4987,7 +5002,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) {
* If more than one selectivity functions exist, all the selectivity functions must be last_row.
* Otherwise, return with error code.
*/
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int16_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ) {
continue;
......@@ -5048,14 +5063,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, TSDB_FUNC_TAG, &index,
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index,
type, bytes, bytes);
pExpr->colInfo.flag = TSDB_COL_TAG;
// NOTE: tag column does not add to source column list
SColumnList ids = {0};
insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name, pExpr);
insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs-1, &ids, bytes, type, name, pExpr);
} else {
// if this query is "group by" normal column, interval is not allowed
if (pQueryInfo->intervalTime > 0) {
......@@ -5063,7 +5078,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
}
bool hasGroupColumn = false;
for (int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, j);
if (pExpr->colInfo.colId == pColIndex->colId) {
break;
......@@ -5106,7 +5121,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// check all query functions in selection clause, multi-output functions are not allowed
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->functionId;
......@@ -5194,11 +5209,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
}
SSqlExpr* pExpr1 = tscSqlExprInsertEmpty(pQueryInfo, 0, TSDB_FUNC_TAG_DUMMY);
if (pExprList->a[0].aliasName != NULL) {
strncpy(pExpr1->aliasName, pExprList->a[0].aliasName, tListLen(pExpr1->aliasName));
} else {
strncpy(pExpr1->aliasName, functionsInfo[index].name, tListLen(pExpr1->aliasName));
}
const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name;
strncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName));
switch (index) {
case 0:
......@@ -5777,9 +5789,9 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr*
// set the input column data byte and type.
for (int32_t i = 0; i < pExprInfo->numOfExprs; ++i) {
if (strcmp((*pExpr)->pSchema->name, pExprInfo->pExprs[i].aliasName) == 0) {
(*pExpr)->pSchema->type = pExprInfo->pExprs[i].resType;
(*pExpr)->pSchema->bytes = pExprInfo->pExprs[i].resBytes;
if (strcmp((*pExpr)->pSchema->name, pExprInfo->pExprs[i]->aliasName) == 0) {
(*pExpr)->pSchema->type = pExprInfo->pExprs[i]->resType;
(*pExpr)->pSchema->bytes = pExprInfo->pExprs[i]->resBytes;
break;
}
}
......
......@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscSecondaryMerge.h"
#include "os.h"
#include "tlosertree.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tutil.h"
#include "tschemautil.h"
typedef struct SCompareParam {
SLocalDataSource **pLocalData;
......@@ -59,19 +59,20 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
* the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
*/
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->aOutputBuf = pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity;
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->aOutputBuf =
pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity;
pCtx->order = pQueryInfo->order.order;
pCtx->functionId = pExpr->functionId;
// input buffer hold only one point data
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
SSchema* pSchema = getColumnModelSchema(pDesc->pColumnModel, i);
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i);
pCtx->aInputElemBuf = pReducer->pTempBuffer->data + offset;
// input data format comes from pModel
......@@ -87,10 +88,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
pCtx->hasNull = true;
pCtx->currentStage = SECONDARY_STAGE_MERGE;
pRes->bytes[i] = pExpr->resBytes;
// for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId;
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
pCtx->param[2].i64Key = pQueryInfo->order.order;
......@@ -106,12 +105,12 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
pCtx->resultInfo->superTableQ = true;
}
int16_t n = 0;
int16_t tagLen = 0;
SQLFunctionCtx** pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, POINTER_BYTES);
int16_t n = 0;
int16_t tagLen = 0;
SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, POINTER_BYTES);
SQLFunctionCtx* pCtx = NULL;
for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = NULL;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExpr->resBytes;
......@@ -217,12 +216,12 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
#ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", pDS->filePage.numOfElems);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pDS->filePage.data, pDS->filePage.numOfElems, pMemBuffer[0]->numOfElemsPerPage,
colInfo);
tColModelDisplayEx(pDesc->pColumnModel, pDS->filePage.data, pDS->filePage.numOfElems,
pMemBuffer[0]->numOfElemsPerPage, colInfo);
#endif
if (pDS->filePage.numOfElems == 0) { // no data in this flush
tscTrace("%p flush data is empty, ignore %d flush record", pSqlObjAddr, idx);
......@@ -243,8 +242,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
param->pLocalData = pReducer->pLocalDataSrc;
param->pDesc = pReducer->pDesc;
param->numOfElems = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
......@@ -278,7 +277,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength;
......@@ -287,12 +286,12 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize);
if (pReducer->pTempBuffer == NULL|| pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
pReducer->pBufForInterpo == NULL || pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) {
tfree(pReducer->pTempBuffer);
tfree(pReducer->discardData);
tfree(pReducer->pResultBuf);
tfree(pReducer->pFinalRes);
tfree(pReducer->pFinalRes);
tfree(pReducer->pBufForInterpo);
tfree(pReducer->prevRowOfInput);
......@@ -308,23 +307,24 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// we change the capacity of schema to denote that there is only one row in temp buffer
pReducer->pDesc->pColumnModel->capacity = 1;
//restore the limitation value at the last stage
// restore the limitation value at the last stage
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
pQueryInfo->limit.offset = pQueryInfo->prjOffset;
}
pReducer->offset = pQueryInfo->limit.offset;
pRes->pLocalReducer = pReducer;
pRes->numOfGroups = 0;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
......@@ -335,7 +335,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
pInterpoInfo->pTags[0] = (char *)pInterpoInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
pInterpoInfo->pTags[i] = pSchema->bytes + pInterpoInfo->pTags[i - 1];
}
} else {
......@@ -387,7 +387,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType) {
SColumnModel *pModel = pDesc->pColumnModel;
if (pPage->numOfElems + numOfRows <= pModel->capacity) {
tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows);
return 0;
......@@ -444,11 +444,11 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
return;
}
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// there is no more result, so we release all allocated resource
SLocalReducer *pLocalReducer = (SLocalReducer*)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
if (pLocalReducer != NULL) {
int32_t status = 0;
while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY,
......@@ -460,19 +460,18 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosDestoryInterpoInfo(&pLocalReducer->interpolationInfo);
if (pLocalReducer->pCtx != NULL) {
for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag);
if (pCtx->tagInfo.pTagCtxList != NULL) {
tfree(pCtx->tagInfo.pTagCtxList);
}
}
tfree(pLocalReducer->pCtx);
}
tfree(pLocalReducer->prevRowOfInput);
tfree(pLocalReducer->pTempBuffer);
......@@ -513,9 +512,9 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
}
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) {
int32_t numOfGroupByCols = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int32_t numOfGroupByCols = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols;
}
......@@ -555,17 +554,17 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
// disable merge procedure for column projection query
assert(functionId != TSDB_FUNC_ARITHM);
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
return true;
}
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) {
return false;
}
......@@ -598,11 +597,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SSchema * pSchema = NULL;
SSchema * pSchema = NULL;
SColumnModel *pModel = NULL;
*pFinalModel = NULL;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pMeterMetaInfo->pMetricMeta->numOfVnodes);
......@@ -633,7 +632,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
if (rlen != 0) {
capacity = nBufferSizes / rlen;
}
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
......@@ -649,22 +648,32 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
// final result depends on the fields number
memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema* p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx);
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema *p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx);
int16_t inter = 0;
int16_t type = -1;
int16_t bytes = 0;
if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
(pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) {
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
getResultDataInfo(p1->type, p1->bytes, pExpr->functionId, 0, &type, &bytes, &inter, 0, false);
} else {
// if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
// (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) ||
// pExpr->functionId == TSDB_FUNC_LAST_ROW) {
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int32_t functionId = pExpr->functionId;
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
type = pModel->pFields[i].field.type;
bytes = pModel->pFields[i].field.bytes;
} else {
if (functionId == TSDB_FUNC_FIRST_DST) {
functionId = TSDB_FUNC_FIRST;
} else if (functionId == TSDB_FUNC_LAST_DST) {
functionId = TSDB_FUNC_LAST;
}
getResultDataInfo(p1->type, p1->bytes, functionId, 0, &type, &bytes, &inter, 0, false);
}
pSchema[i].type = type;
......@@ -762,13 +771,15 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
}
}
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo, SInterpolationInfo *pInterpoInfo) {
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo,
SInterpolationInfo *pInterpoInfo) {
// discard following dataset in the same group and reset the interpolation information
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
......@@ -781,7 +792,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo*
}
// todo merge with following function
//static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
//
// for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
......@@ -798,7 +809,8 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo*
// }
//}
static void reversedCopyFromInterpolationToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage **pResPages, SLocalReducer *pLocalReducer) {
static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages,
SLocalReducer *pLocalReducer) {
assert(0);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
......@@ -822,11 +834,11 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo* pQueryInfo, SSqlRe
* by "interuptHandler" function in shell
*/
static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage *pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->pLocalReducer != pLocalReducer) {
/*
* Release the SSqlObj is called, and it is int destroying function invoked by other thread.
......@@ -885,7 +897,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
}
int64_t *pPrimaryKeys = (int64_t *)pLocalReducer->pBufForInterpo;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
int64_t actualETime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
......@@ -900,17 +912,18 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
int32_t *functions = (int32_t *)((char *)srcData + pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(void *));
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
srcData[i] = pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows;
srcData[i] =
pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows;
functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit,
precision);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
......@@ -925,7 +938,8 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
if (pQueryInfo->limit.offset > 0) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset, newRows * pField->bytes);
memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset,
newRows * pField->bytes);
}
}
......@@ -973,10 +987,10 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
if (pQueryInfo->order.order == TSQL_SO_ASC) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i);
int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, pField->bytes * pRes->numOfRows);
}
} else {//todo bug??
} else { // todo bug??
reversedCopyFromInterpolationToDstBuf(pQueryInfo, pRes, pResPages, pLocalReducer);
}
}
......@@ -996,9 +1010,9 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
// copy to previous temp buffer
for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pColumnModel, i);
int16_t offset = getColumnModelOffset(pColumnModel, i);
SSchema *pSchema = getColumnModelSchema(pColumnModel, i);
int16_t offset = getColumnModelOffset(pColumnModel, i);
memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
}
......@@ -1006,11 +1020,11 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
pLocalReducer->hasPrevRow = true;
}
static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, bool needInit) {
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for(int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
......@@ -1040,7 +1054,7 @@ static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer,
}
}
static void handleUnprocessedRow(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
if (pLocalReducer->hasUnprocessedRow) {
pLocalReducer->hasUnprocessedRow = false;
doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
......@@ -1050,31 +1064,31 @@ static void handleUnprocessedRow(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, tF
static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) {
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
//
// maxOutput = 1;
// continue;
// }
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
//
// maxOutput = 1;
// continue;
// }
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, j);
int32_t functionId = pExpr->functionId;
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue;
}
if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) {
maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes;
}
}
return maxOutput;
}
......@@ -1084,7 +1098,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
* filled with the same result, which is the tags, specified in group by clause
*
*/
static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) {
static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) {
int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
......@@ -1095,7 +1109,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo
assert(maxBufSize >= 0);
char *buf = malloc((size_t) maxBufSize);
char *buf = malloc((size_t)maxBufSize);
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (pExpr->functionId != TSDB_FUNC_TAG) {
......@@ -1117,7 +1131,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo
free(buf);
}
int32_t finalizeRes(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) {
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]);
......@@ -1139,7 +1153,7 @@ int32_t finalizeRes(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) {
* results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure
*/
bool needToMerge(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
......@@ -1161,7 +1175,7 @@ bool needToMerge(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer, tFilePage
return (ret == 0);
}
static bool reachGroupResultLimit(SQueryInfo* pQueryInfo, SSqlRes *pRes) {
static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0);
}
......@@ -1169,7 +1183,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->numOfGroups += 1;
// the output group is limited by the slimit clause
......@@ -1192,11 +1206,11 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/
bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage *pResBuf = pLocalReducer->pResultBuf;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage * pResBuf = pLocalReducer->pResultBuf;
SColumnModel *pModel = pLocalReducer->resColModel;
pRes->code = TSDB_CODE_SUCCESS;
......@@ -1224,11 +1238,10 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutputCols - pQueryInfo->groupbyExpr.numOfGroupCols;
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
int16_t offset = getColumnModelOffset(pModel, startIndex + i);
SSchema* pSchema = getColumnModelSchema(pModel, startIndex + i);
memcpy(pInterpoInfo->pTags[i],
pLocalReducer->pBufForInterpo + offset * pResBuf->numOfElems, pSchema->bytes);
int16_t offset = getColumnModelOffset(pModel, startIndex + i);
SSchema *pSchema = getColumnModelSchema(pModel, startIndex + i);
memcpy(pInterpoInfo->pTags[i], pLocalReducer->pBufForInterpo + offset * pResBuf->numOfElems, pSchema->bytes);
}
taosInterpoSetStartInfo(&pLocalReducer->interpolationInfo, pResBuf->numOfElems, pQueryInfo->interpoType);
......@@ -1237,7 +1250,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
return true;
}
void resetOutputBuf(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
pLocalReducer->pCtx[i].aOutputBuf =
pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity;
......@@ -1250,21 +1263,22 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
// In handling data in other groups, we need to reset the interpolation information for a new group data
pRes->numOfRows = 0;
pRes->numOfTotalInCurrentClause = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pQueryInfo->limit.offset = pLocalReducer->offset;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t precision = pMeterMetaInfo->pMeterMeta->precision;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t precision = pMeterMetaInfo->pMeterMeta->precision;
// for group result interpolation, do not return if not data is generated
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t newTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime,
pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize);
}
}
......@@ -1276,12 +1290,12 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SLocalReducer * pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int8_t p = pMeterMetaInfo->pMeterMeta->precision;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int8_t p = pMeterMetaInfo->pMeterMeta->precision;
if (taosHasRemainsDataForInterpolation(pInterpoInfo)) {
assert(pQueryInfo->interpoType != TSDB_INTERPO_NONE);
......@@ -1290,7 +1304,8 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1));
int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p);
TSKEY ekey =
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
......@@ -1312,9 +1327,9 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
......@@ -1322,7 +1337,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
......@@ -1351,15 +1367,15 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
return false;
}
static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, k);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
......@@ -1376,24 +1392,24 @@ static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) {
int32_t tscDoLocalreduce(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
tscTrace("%s call the drop local reducer", __FUNCTION__);
tscDestroyLocalReducer(pSql);
return 0;
}
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// set the data merge in progress
int32_t prevStatus =
atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
return TSDB_CODE_SUCCESS;
}
......@@ -1491,8 +1507,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) {
* if the previous group does NOT generate any result (pResBuf->numOfElems == 0),
* continue to process results instead of return results.
*/
if ((!sameGroup && pResBuf->numOfElems > 0) ||
(pResBuf->numOfElems == pLocalReducer->resColModel->capacity)) {
if ((!sameGroup && pResBuf->numOfElems > 0) || (pResBuf->numOfElems == pLocalReducer->resColModel->capacity)) {
// does not belong to the same group
bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
......@@ -1545,7 +1560,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
} else { // result buffer is not full
doMergeWithPrevRows(pSql, numOfRes);
doProcessResultInNextWindow(pSql, numOfRes);
savePreviousRow(pLocalReducer, tmpBuffer);
}
}
......
......@@ -643,9 +643,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid);
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
pNew->cmd.numOfCols = 0;
......@@ -656,6 +656,10 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
// this data needs to be transfer to support struct
pNewQueryInfo->fieldsInfo.numOfOutputCols = 0;
pNewQueryInfo->exprsInfo.numOfExprs = 0;
// set the ts,tags that involved in join, as the output column of intermediate result
tscClearSubqueryInfo(&pNew->cmd);
......@@ -1537,7 +1541,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->fieldsInfo.numOfOutputCols;
int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// meter query without tags values
......@@ -1546,11 +1550,10 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
}
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids;
int32_t outputColumnSize = pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg);
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
if (pQueryInfo->tsBuf != NULL) {
......@@ -1787,7 +1790,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN);
pMsg += sizeof(SSqlFuncExprMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
......@@ -1862,6 +1865,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
*((int16_t *)pMsg) += pCol->flag;
pMsg += sizeof(pCol->flag);
memcpy(pMsg, pCol->name, tListLen(pCol->name));
pMsg += tListLen(pCol->name);
}
}
......@@ -2491,16 +2497,8 @@ static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
}
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
pRes->bytes[i] = pField->bytes;
// if (pQueryInfo->order.order == TSQL_SO_DESC) {
// pRes->bytes[i] = -pRes->bytes[i];
// pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes);
// } else {
pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
// }
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
}
return 0;
......@@ -2725,8 +2723,10 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2;
int32_t elemSize = sizeof(SMetricMetaElemMsg) * pQueryInfo->numOfTables;
int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx);
int32_t len = tagLen + joinCondLen + elemSize + defaultSize;
int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
}
......@@ -2854,6 +2854,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pDestCol->colIdx = htons(pCol->colIdx);
pDestCol->colId = htons(pDestCol->colId);
pDestCol->flag = htons(pDestCol->flag);
strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
pMsg += sizeof(SColIndexEx);
}
......@@ -3291,6 +3292,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(SMeterMeta);
pMeterMetaInfo->pMeterMeta =
(SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
......@@ -3301,6 +3303,9 @@ int tscProcessShowRsp(SSqlObj *pSql) {
index.columnIndex = i;
tscColumnBaseInfoInsert(pQueryInfo, &index);
tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]);
pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
pMeterSchema[i].type, pMeterSchema[i].bytes, pMeterSchema[i].bytes);
}
tscFieldInfoCalOffset(pQueryInfo);
......
......@@ -441,7 +441,10 @@ static void **doSetResultRowData(SSqlObj *pSql) {
int32_t num = 0;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row;
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
} else {
assert(0);
}
// primary key column cannot be null in interval query, no need to check
......
......@@ -246,8 +246,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
int32_t retry = tsProjectExecInterval;
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry);
tscClearSqlMetaInfoForce(&(pStream->pSql->cmd));
tscSetRetryTimer(pStream, pStream->pSql, retry);
return;
}
......
......@@ -336,35 +336,17 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
tfree(pQueryInfo->defaultVal);
}
void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) {
/* remove the metermeta/metricmeta in cache */
// taosRemoveDataFromCache(tscCacheHandle, (void**)&(pCmd->pMeterMeta), true);
// taosRemoveDataFromCache(tscCacheHandle, (void**)&(pCmd->pMetricMeta), true);
}
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) {
//
int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols;
pRes->numOfnchar = numOfOutputCols;
// for (int32_t i = 0; i < numOfOutputCols; ++i) {
// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i);
// if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// pRes->numOfnchar++;
// }
// }
pRes->numOfCols = numOfOutputCols;
pRes->tsrow = calloc(1, (POINTER_BYTES + sizeof(short)) * numOfOutputCols + POINTER_BYTES * pRes->numOfnchar);
pRes->bytes = calloc(numOfOutputCols, sizeof(short));
// if (pRes->numOfnchar > 0) {
pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols);
// }
pRes->tsrow = calloc(POINTER_BYTES, numOfOutputCols);
pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols);
// not enough memory
if (pRes->tsrow == NULL || pRes->bytes == NULL || (pRes->buffer == NULL && pRes->numOfnchar > 0)) {
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
tfree(pRes->tsrow);
tfree(pRes->bytes);
tfree(pRes->buffer);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -377,13 +359,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) {
// assert(pRes->numOfnchar > 0);
// free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfnchar; i++) {
for (int i = 0; i < pRes->numOfCols; i++) {
tfree(pRes->buffer[i]);
}
pRes->numOfnchar = 0;
pRes->numOfCols = 0;
}
tfree(pRes->pRsp);
......@@ -392,7 +373,6 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex);
tfree(pRes->buffer);
tfree(pRes->bytes);
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
......@@ -930,10 +910,10 @@ void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionE
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) {
SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo;
pExprInfo->pExprs[0].offset = 0;
pExprInfo->pExprs[0]->offset = 0;
for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) {
pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes;
pExprInfo->pExprs[i]->offset = pExprInfo->pExprs[i - 1]->offset + pExprInfo->pExprs[i - 1]->resBytes;
}
}
......@@ -957,10 +937,10 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
return;
}
pExprInfo->pExprs[0].offset = 0;
pExprInfo->pExprs[0]->offset = 0;
for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) {
pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes;
pExprInfo->pExprs[i]->offset = pExprInfo->pExprs[i - 1]->offset + pExprInfo->pExprs[i - 1]->resBytes;
}
}
......@@ -976,6 +956,8 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList
for (int32_t i = 0; i < size; ++i) {
assert(indexList[i] >= 0 && indexList[i] <= src->numOfOutputCols);
tscFieldInfoSetValFromField(dst, i, &src->pFields[indexList[i]]);
dst->pVisibleCols[i] = src->pVisibleCols[indexList[i]];
dst->pSqlExpr[i] = src->pSqlExpr[indexList[i]];
}
}
}
......@@ -984,14 +966,14 @@ void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) {
*dst = *src;
dst->pFields = malloc(sizeof(TAOS_FIELD) * dst->numOfAlloc);
// dst->pOffset = malloc(sizeof(short) * dst->numOfAlloc);
dst->pVisibleCols = malloc(sizeof(bool) * dst->numOfAlloc);
dst->pSqlExpr = malloc(POINTER_BYTES * dst->numOfAlloc);
dst->pExpr = malloc(POINTER_BYTES * dst->numOfAlloc);
memcpy(dst->pFields, src->pFields, sizeof(TAOS_FIELD) * dst->numOfOutputCols);
// memcpy(dst->pOffset, src->pOffset, sizeof(short) * dst->numOfOutputCols);
memcpy(dst->pVisibleCols, src->pVisibleCols, sizeof(bool) * dst->numOfOutputCols);
memcpy(dst->pSqlExpr, src->pSqlExpr, POINTER_BYTES * dst->numOfOutputCols);
memcpy(dst->pExpr, src->pExpr, POINTER_BYTES * dst->numOfOutputCols);
}
TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index) {
......@@ -1009,7 +991,7 @@ int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
return 0;
}
return pQueryInfo->exprsInfo.pExprs[index].offset;
return pQueryInfo->exprsInfo.pExprs[index]->offset;
}
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) {
......@@ -1039,7 +1021,7 @@ int32_t tscGetResRowLength(SQueryInfo* pQueryInfo) {
int32_t size = 0;
for(int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
size += pQueryInfo->exprsInfo.pExprs[i].resBytes;
size += pQueryInfo->exprsInfo.pExprs[i]->resBytes;
}
return size;
......@@ -1050,7 +1032,6 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
return;
}
// tfree(pFieldInfo->pOffset);
tfree(pFieldInfo->pFields);
tfree(pFieldInfo->pVisibleCols);
tfree(pFieldInfo->pSqlExpr);
......@@ -1101,11 +1082,12 @@ SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t f
_exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1);
_exprEvic(pExprInfo, index);
SSqlExpr* pExpr = &pExprInfo->pExprs[index];
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
pExpr->functionId = functionId;
pExprInfo->numOfExprs++;
pExprInfo->pExprs[index] = pExpr;
return pExpr;
}
......@@ -1118,8 +1100,9 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
_exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1);
_exprEvic(pExprInfo, index);
SSqlExpr* pExpr = &pExprInfo->pExprs[index];
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
pExprInfo->pExprs[index] = pExpr;
pExpr->functionId = functionId;
int16_t numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns;
......@@ -1161,7 +1144,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
return NULL;
}
SSqlExpr* pExpr = &pExprInfo->pExprs[index];
SSqlExpr* pExpr = pExprInfo->pExprs[index];
pExpr->functionId = functionId;
......@@ -1196,7 +1179,7 @@ SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) {
return NULL;
}
return &pQueryInfo->exprsInfo.pExprs[index];
return pQueryInfo->exprsInfo.pExprs[index];
}
void* tscSqlExprDestroy(SSqlExpr* pExpr) {
......@@ -1208,6 +1191,8 @@ void* tscSqlExprDestroy(SSqlExpr* pExpr) {
tVariantDestroy(&pExpr->param[i]);
}
tfree(pExpr);
return NULL;
}
......@@ -1219,8 +1204,8 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) {
return;
}
for(int32_t i = 0; i < pExprInfo->numOfAlloc; ++i) {
tscSqlExprDestroy(&pExprInfo->pExprs[i]);
for(int32_t i = 0; i < pExprInfo->numOfExprs; ++i) {
tscSqlExprDestroy(pExprInfo->pExprs[i]);
}
tfree(pExprInfo->pExprs);
......@@ -1230,27 +1215,40 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) {
}
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid) {
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) {
if (src == NULL) {
return;
}
*dst = *src;
dst->pExprs = calloc(dst->numOfAlloc, sizeof(SSqlExpr));
dst->pExprs = calloc(dst->numOfAlloc, POINTER_BYTES);
int16_t num = 0;
for (int32_t i = 0; i < src->numOfExprs; ++i) {
if (src->pExprs[i].uid == tableuid) {
dst->pExprs[num++] = src->pExprs[i];
if (src->pExprs[i]->uid == tableuid) {
if (deepcopy) {
dst->pExprs[num] = calloc(1, sizeof(SSqlExpr));
*dst->pExprs[num] = *src->pExprs[i];
} else {
dst->pExprs[num] = src->pExprs[i];
}
num++;
}
}
dst->numOfExprs = num;
for (int32_t i = 0; i < dst->numOfExprs; ++i) {
for (int32_t j = 0; j < src->pExprs[i].numOfParams; ++j) {
tVariantAssign(&dst->pExprs[i].param[j], &src->pExprs[i].param[j]);
if (deepcopy) {
for (int32_t i = 0; i < dst->numOfExprs; ++i) {
for (int32_t j = 0; j < src->pExprs[i]->numOfParams; ++j) {
tVariantAssign(&dst->pExprs[i]->param[j], &src->pExprs[i]->param[j]);
}
}
}
}
static void clearVal(SColumnBase* pBase) {
......@@ -2005,7 +2003,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;
tscSqlExprCopy(&pNewQueryInfo->exprsInfo, &pQueryInfo->exprsInfo, uid);
tscSqlExprCopy(&pNewQueryInfo->exprsInfo, &pQueryInfo->exprsInfo, uid, true);
int32_t numOfOutputCols = pNewQueryInfo->exprsInfo.numOfExprs;
......@@ -2020,7 +2018,19 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutputCols);
free(indexList);
// make sure the the sqlExpr for each fields is correct
// todo handle the agg arithmetic expression
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutputCols; ++f) {
char* name = pNewQueryInfo->fieldsInfo.pFields[f].name;
for(int32_t k1 = 0; k1 < pNewQueryInfo->exprsInfo.numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(name, pExpr1->aliasName) == 0) {
pNewQueryInfo->fieldsInfo.pSqlExpr[f] = pExpr1;
}
}
}
tscFieldInfoUpdateOffsetForInterResult(pNewQueryInfo);
}
......
......@@ -298,6 +298,9 @@ void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, in
bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *minval, char *maxval);
bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo);
void initResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
......
......@@ -1056,7 +1056,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN);
pMsg += sizeof(SSqlFuncExprMsg);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册