提交 875149dd 编写于 作者: H Haojun Liao

[TD-2060]

上级 07ecde07
......@@ -62,6 +62,7 @@ typedef struct SLocalReducer {
bool hasUnprocessedRow;
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
char* pFinalRes; // result data after interpo
......@@ -74,7 +75,8 @@ typedef struct SLocalReducer {
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel * pFinalColModel; // colModel for final result
SColumnModel* pFinalColModel; // colModel for final result
SColumnModel* pFFColModel;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
......@@ -82,7 +84,7 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, uint32_t nBufferSize);
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
int32_t numOfVnodes);
......
......@@ -41,6 +41,8 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult);
char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
#ifdef __cplusplus
}
#endif
......
......@@ -125,6 +125,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
......@@ -158,7 +159,7 @@ SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t ind
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
......@@ -167,15 +168,15 @@ static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQue
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList);
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol);
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol);
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
......
......@@ -136,6 +136,7 @@ typedef struct SSqlExpr {
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
} SSqlExpr;
typedef struct SColumnIndex {
......@@ -251,6 +252,7 @@ typedef struct SQueryInfo {
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
} SQueryInfo;
typedef struct {
......@@ -462,17 +464,16 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex, int32_t offset) {
SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
int32_t bytes = pInfo->pSqlExpr->resBytes;
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
char* pData = pRes->data + (int32_t)(pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row);
char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row);
// user defined constant value output columns
if (TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz;
pRes->length[columnIndex] = pInfo->pSqlExpr->param[1].nLen;
......@@ -517,6 +518,7 @@ extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SQueryInfo* pQueryInfo);
#ifdef __cplusplus
}
......
......@@ -351,7 +351,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0);
} else {
// todo add
}
......
......@@ -162,7 +162,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false);
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
......@@ -172,7 +172,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
typeColLength, false);
-1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE;
......@@ -182,7 +182,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
sizeof(int32_t), false);
-1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t);
......@@ -192,7 +192,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
noteColLength, false);
-1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE;
return rowLen;
......@@ -407,8 +407,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
f.bytes, f.bytes - VARSTR_HEADER_SIZE, false);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes;
......@@ -422,7 +421,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), ddlLen, false);
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE;
......
......@@ -13,14 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscLocalMerge.h"
#include "tscSubquery.h"
#include "os.h"
#include "qAst.h"
#include "tlosertree.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tutil.h"
#include "tscLog.h"
#include "tscLocalMerge.h"
typedef struct SCompareParam {
SLocalDataSource **pLocalData;
......@@ -29,6 +30,8 @@ typedef struct SCompareParam {
int32_t groupOrderType;
} SCompareParam;
static void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* pBuf, char* src, int32_t numOfRows);
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
......@@ -132,21 +135,34 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
}
static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo);
int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pFillCol[i].col.bytes = pExpr->resBytes;
pFillCol[i].col.type = (int8_t)pExpr->resType;
pFillCol[i].col.colId = pExpr->colInfo.colId;
pFillCol[i].flag = pExpr->colInfo.flag;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->functionId;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
offset += pExpr->resBytes;
SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pIField->pArithExprInfo == NULL) {
SSqlExpr* pExpr = pIField->pSqlExpr;
pFillCol[i].col.bytes = pExpr->resBytes;
pFillCol[i].col.type = (int8_t)pExpr->resType;
pFillCol[i].col.colId = pExpr->colInfo.colId;
pFillCol[i].flag = pExpr->colInfo.flag;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->functionId;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
} else {
pFillCol[i].col.bytes = pIField->field.bytes;
pFillCol[i].col.type = (int8_t)pIField->field.type;
pFillCol[i].col.colId = -100;
pFillCol[i].flag = TSDB_COL_NORMAL;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = -1;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
}
offset += pFillCol[i].col.bytes;
}
return pFillCol;
......@@ -342,8 +358,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
return;
}
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo);
......@@ -372,7 +386,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
}
}
......@@ -641,7 +655,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, uint32_t nBufferSizes) {
SColumnModel **pFinalModel, SColumnModel **pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -736,8 +750,19 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
}
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
tfree(pSchema);
memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
TAOS_FIELD* pField = &pIField->field;
pSchema[i].type = pField->type;
pSchema[i].bytes = pField->bytes;
strncpy(pSchema[i].name, pField->name, tListLen(pField->name));
}
*pFFModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutput, capacity);
return TSDB_CODE_SUCCESS;
}
......@@ -966,10 +991,11 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo);
}
int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows));
offset += pField->bytes;
}
pRes->numOfRowsGroup += pRes->numOfRows;
......@@ -1222,6 +1248,14 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
char* pbuf = calloc(1,pResBuf->num * pModel->rowSize);
doArithmeticCalculate(pQueryInfo, pbuf, pResBuf->data, pResBuf->num);
memcpy(pResBuf->data, pbuf, pResBuf->num * pModel->rowSize);
free(pbuf);
}
#ifdef _DEBUG_VIEW
printf("final result before interpo:\n");
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
......@@ -1588,3 +1622,38 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->pLocalReducer->pResultBuf->num = numOfRes;
pRes->data = pRes->pLocalReducer->pResultBuf->data;
}
void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* outputBuf, char* src, int32_t numOfRows) {
size_t size = tscNumOfFields(pQueryInfo);
SArithmeticSupport* pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
int32_t rowIndex = 0;
int32_t offset = 0;
for (int i = 0; i < size; ++i) {
SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
// todo refactor
pArithSup->offset = 0;
pArithSup->pArithExpr = pSup->pArithExprInfo;
pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
pArithSup->exprList = pQueryInfo->exprList;
pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES);
for(int32_t k = 0; k < pArithSup->numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
pArithSup->data[k] = (src + numOfRows* pExpr->offset) + rowIndex*pExpr->resBytes;
}
tExprTreeCalcTraverse(pArithSup->pArithExpr->pExpr, numOfRows, outputBuf + numOfRows*offset, pArithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(outputBuf + numOfRows * offset, pExpr->offset * numOfRows + src, pExpr->resBytes * numOfRows);
}
offset += pSup->field.bytes;
}
}
\ No newline at end of file
......@@ -52,7 +52,8 @@ typedef struct SConvertFunc {
int32_t originFuncId;
int32_t execFuncId;
} SConvertFunc;
static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIndex, int32_t tableIndex);
static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex);
static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo);
static char* getAccountId(SSqlObj* pSql);
......@@ -127,6 +128,10 @@ static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid);
int16_t getNewResColId(SQueryInfo* pQueryInfo) {
return pQueryInfo->resColumnId--;
}
static uint8_t convertOptr(SStrToken *pToken) {
switch (pToken->type) {
case TK_LT:
......@@ -1274,6 +1279,7 @@ static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex*
SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
}
static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSQLExprItem* pItem) {
const char* msg1 = "invalid column name, illegal column type, or columns in arithmetic expression from two tables";
const char* msg2 = "invalid arithmetic expression in select clause";
......@@ -1305,7 +1311,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double),
sizeof(double), false);
-1000, sizeof(double), false);
char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z;
size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1);
......@@ -1321,6 +1327,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// check for if there is a tag in the arithmetic express
size_t numOfNode = taosArrayGetSize(colList);
for(int32_t k = 0; k < numOfNode; ++k) {
SColIndex* pIndex = taosArrayGet(colList, k);
......@@ -1346,9 +1353,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
char* c = tbufGetData(&bw, false);
// set the serialized binary string as the parameter of arithmetic expression
addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len, index.tableIndex);
addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len);
insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr);
// add ts column
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
......@@ -1380,6 +1387,10 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
pArithExprInfo->interBytes = sizeof(double);
pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE;
pArithExprInfo->base.functionId = TSDB_FUNC_ARITHM;
pArithExprInfo->base.numOfParams = 1;
pArithExprInfo->base.resColId = getNewResColId(pQueryInfo);
int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pArithExprInfo->pExpr, NULL);
......@@ -1388,14 +1399,28 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
pInfo->pArithExprInfo = pArithExprInfo;
}
SBufferWriter bw = tbufInitWriter(NULL, false);
TRY(0) {
exprTreeToBinary(&bw, pInfo->pArithExprInfo->pExpr);
} CATCH(code) {
tbufCloseWriter(&bw);
UNUSED(code);
// TODO: other error handling
} END_TRY
pInfo->pArithExprInfo->base.arg[0].argBytes = tbufTell(&bw);
pInfo->pArithExprInfo->base.arg[0].argValue.pz = tbufGetData(&bw, true);
pInfo->pArithExprInfo->base.arg[0].argType = TSDB_DATA_TYPE_BINARY;
tbufCloseWriter(&bw);
}
return TSDB_CODE_SUCCESS;
}
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) {
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos, pIndex->columnIndex, pIndex->tableIndex);
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, pIndex->columnIndex, pIndex->tableIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -1540,7 +1565,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
return TSDB_CODE_SUCCESS;
}
SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIndex, int32_t tableIndex) {
SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
......@@ -1552,20 +1577,22 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
if (functionId == TSDB_FUNC_TAGPRJ) {
index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta);
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
} else {
index.columnIndex = colIndex;
}
return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes,
pSchema->bytes, functionId == TSDB_FUNC_TAGPRJ);
int16_t colId = getNewResColId(pQueryInfo);
return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, pSchema->bytes,
(functionId == TSDB_FUNC_TAGPRJ));
}
SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) {
int16_t colId = getNewResColId(pQueryInfo);
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type,
pColSchema->bytes, pColSchema->bytes, TSDB_COL_IS_TAG(flag));
pColSchema->bytes, colId, pColSchema->bytes, TSDB_COL_IS_TAG(flag));
tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName));
SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex);
......@@ -1601,7 +1628,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
}
for (int32_t j = 0; j < numOfTotalColumns; ++j) {
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex);
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, j, pIndex->tableIndex);
tstrncpy(pExpr->aliasName, pSchema[j].name, sizeof(pExpr->aliasName));
pIndex->columnIndex = j;
......@@ -1710,7 +1737,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
bytes = pSchema->bytes;
}
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pQueryInfo), bytes, false);
tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName));
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
......@@ -1804,7 +1831,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false);
} else if (sqlOptr == TK_INTEGER) { // select count(1) from table1
char buf[8] = {0};
int64_t val = -1;
......@@ -1816,7 +1843,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (val == 1) {
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false);
} else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -1836,12 +1863,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, isTag);
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, isTag);
}
} else { // count(*) is equalled to count(primary_timestamp_key)
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
......@@ -1928,7 +1955,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
colIndex += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
TSDB_KEYSIZE, false);
getNewResColId(pQueryInfo), TSDB_KEYSIZE, false);
SColumnList ids = getColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr);
......@@ -1939,7 +1966,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, resultSize, false);
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
if (optr == TK_LEASTSQUARES) {
/* set the leastsquares parameters */
......@@ -1948,14 +1975,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_SQL;
}
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES, 0);
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES);
memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0);
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
}
SColumnList ids = {0};
......@@ -2180,8 +2207,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
colIndex += 1; // the first column is ts
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false);
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0);
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
} else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......@@ -2198,8 +2225,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// todo REFACTOR
// set the first column ts for top/bottom query
SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
TSDB_KEYSIZE, false);
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pQueryInfo),
TSDB_KEYSIZE, false);
tstrncpy(pExpr->aliasName, aAggs[TSDB_FUNC_TS].aName, sizeof(pExpr->aliasName));
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
......@@ -2209,8 +2236,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
colIndex += 1; // the first column is ts
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false);
addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0);
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
}
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
......@@ -2694,7 +2721,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
}
}
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -2922,7 +2949,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
} else {
tscFieldInfoUpdateOffset(pQueryInfo);
}
......@@ -4437,7 +4464,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
size_t size = tscNumOfFields(pQueryInfo);
if (pQueryInfo->fillVal == NULL) {
pQueryInfo->fillVal = calloc(size, sizeof(int64_t));
......@@ -4451,12 +4478,8 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->fillType = TSDB_FILL_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
} else {
setNull((char*)&pQueryInfo->fillVal[i], pFields->type, pFields->bytes);
};
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
setNull((char*)&pQueryInfo->fillVal[i], pField->type, pField->bytes);
}
} else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->fillType = TSDB_FILL_PREV;
......@@ -4487,15 +4510,15 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
int32_t j = 1;
for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pField->type);
continue;
}
tVariant* p = taosArrayGet(pFillToken, j);
int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pFields->type, true);
int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pField->type, true);
if (ret != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
......@@ -4505,12 +4528,12 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
tVariantListItem* lastItem = taosArrayGetLast(pFillToken);
for (int32_t i = numOfFillVal; i < size; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pField->type);
} else {
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type, true);
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pField->type, true);
}
}
}
......@@ -5447,7 +5470,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
int16_t type = pTagSchema->type;
int16_t bytes = pTagSchema->bytes;
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true);
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
pExpr->colInfo.flag = TSDB_COL_TAG;
// NOTE: tag column does not add to source column list
......@@ -5750,7 +5773,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true);
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName));
tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName));
......@@ -5913,7 +5936,7 @@ int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
SColumnIndex ind = {0};
SSqlExpr* pExpr1 = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG_DUMMY, &ind, TSDB_DATA_TYPE_INT,
tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false);
tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, getNewResColId(pQueryInfo), tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false);
const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name;
tstrncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName));
......@@ -6585,6 +6608,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) {
(*pExpr)->pSchema->type = (uint8_t)p1->resType;
(*pExpr)->pSchema->bytes = p1->resBytes;
(*pExpr)->pSchema->colId = p1->resColId;
if (uid != NULL) {
*uid = p1->uid;
......
......@@ -702,7 +702,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->queryType = htonl(pQueryInfo->type);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
// set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
......@@ -764,12 +764,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL;
}
assert(pExpr->resColId < 0);
pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
......@@ -787,7 +790,73 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
}
if(tscIsSecondStageQuery(pQueryInfo)) {
size_t output = tscNumOfFields(pQueryInfo);
pQueryMsg->secondStageOutput = htonl(output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SSqlExpr *pExpr = pField->pSqlExpr;
if (pExpr != NULL) {
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlFuncExpr1->functionId = htons(pExpr->functionId);
pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
}
}
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
} else {
assert(pField->pArithExprInfo != NULL);
SExprInfo* pExprInfo = pField->pArithExprInfo;
pSqlFuncExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId);
pSqlFuncExpr1->functionId = htons(pExprInfo->base.functionId);
pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
// todo add log
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType);
pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes);
if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes);
pMsg += pExprInfo->base.arg[j].argBytes;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64);
}
}
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
}
}
} else {
pQueryMsg->secondStageOutput = 0;
}
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
......@@ -814,7 +883,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
*((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
pMsg += sizeof(pQueryInfo->fillVal[0]);
}
......@@ -1950,7 +2019,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false);
}
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
......@@ -28,7 +28,6 @@
#include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h"
#include "ttimer.h"
static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) {
......@@ -482,7 +481,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
assert(0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0);
}
*rows = pRes->tsrow;
......
......@@ -1642,9 +1642,10 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
}
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL;
SColumnModel *pFFModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 16); // 64KB
......@@ -1662,7 +1663,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFFModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
......@@ -1707,6 +1708,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
trs->pFFColModel = pFFModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
......@@ -2418,7 +2420,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
}
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
int32_t index = -1;
......@@ -2449,48 +2451,49 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscNumOfFields(pQueryInfo);
int32_t offset = 0;
for (int i = 0; i < size; ++i) {
SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
}
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, offset);
TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
offset += pField->bytes;
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->interval.interval > 0) {
continue;
}
TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField);
}
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
if (pRes->pArithSup == NULL) {
pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
}
pRes->pArithSup->offset = 0;
pRes->pArithSup->pArithExpr = pSup->pArithExprInfo;
pRes->pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
pRes->pArithSup->exprList = pQueryInfo->exprList;
pRes->pArithSup->data = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES);
if (pRes->buffer[i] == NULL) {
TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pRes->buffer[i] = malloc(field->bytes);
}
for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = (unsigned char*)pRes->buffer[i];
}
// if (pSup->pArithExprInfo != NULL) {
// if (pRes->pArithSup == NULL) {
// pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
// }
//
// pRes->pArithSup->offset = 0;
// pRes->pArithSup->pArithExpr = pSup->pArithExprInfo;
// pRes->pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
// pRes->pArithSup->exprList = pQueryInfo->exprList;
// pRes->pArithSup->data = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES);
//
// if (pRes->buffer[i] == NULL) {
// TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
// pRes->buffer[i] = malloc(field->bytes);
// }
//
// for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
// pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
// }
//
// tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
// TSDB_ORDER_ASC, getArithemicInputSrc);
// pRes->tsrow[i] = (unsigned char*)pRes->buffer[i];
// }
}
pRes->row++; // index increase one-step
......
......@@ -219,6 +219,24 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return true;
}
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
size_t numOfOutput = tscNumOfFields(pQueryInfo);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (numOfOutput == numOfExprs) {
return false;
}
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo;
if (pExprInfo != NULL) {
return true;
}
}
return false;
}
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
......@@ -855,28 +873,11 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->offset = 0;
for (int32_t i = 1; i < numOfExprs; ++i) {
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
p->offset = prev->offset + prev->resBytes;
}
}
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
if (tscSqlExprNumOfExprs(pQueryInfo) == 0) {
return;
}
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->offset = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 1; i < numOfExprs; ++i) {
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
p->offset = prev->offset + prev->resBytes;
}
}
......@@ -955,7 +956,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
}
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, int32_t colType) {
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
......@@ -988,8 +989,9 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
pExpr->resType = type;
pExpr->resBytes = size;
pExpr->resColId = resColId;
pExpr->interBytes = interSize;
if (pTableMetaInfo->pTableMeta) {
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
}
......@@ -998,20 +1000,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
}
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol) {
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) {
int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList);
if (index == num) {
return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
}
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayInsert(pQueryInfo->exprList, index, &pExpr);
return pExpr;
}
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol) {
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) {
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprList, &pExpr);
return pExpr;
}
......@@ -1039,16 +1041,14 @@ size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList);
}
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) {
if (pExpr == NULL || argument == NULL || bytes == 0) {
return;
}
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
assert (pExpr != NULL || argument != NULL || bytes != 0);
// set parameter value
// transfer to tVariant from byte data/no ascii data
tVariantCreateFromBinary(&pExpr->param[pExpr->numOfParams], argument, bytes, type);
pExpr->numOfParams += 1;
assert(pExpr->numOfParams <= 3);
}
......@@ -1601,6 +1601,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId= -1000;
}
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
......
......@@ -392,6 +392,7 @@ typedef struct SColIndex {
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
SColIndex colInfo;
struct ArgElem {
......@@ -461,11 +462,6 @@ typedef struct STimeWindow {
TSKEY ekey;
} STimeWindow;
/*
* the outputCols is equalled to or larger than numOfCols
* e.g., select min(colName), max(colName), avg(colName) from table
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct {
SMsgHead head;
STimeWindow window;
......@@ -483,13 +479,14 @@ typedef struct {
uint32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers
int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t fillType; // interpolate type
uint64_t fillVal; // default value array list
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order
int32_t numOfTags; // number of tags columns involved
int16_t fillType; // interpolate type
uint64_t fillVal; // default value array list
int32_t secondStageOutput;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order
int32_t numOfTags; // number of tags columns involved
SColumnInfo colList[];
} SQueryTableMsg;
......
......@@ -153,6 +153,9 @@ typedef struct SQuery {
int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pSelectExpr;
SExprInfo* pExpr2;
int32_t numOfExpr2;
SColumnInfo* colList;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
......
......@@ -128,7 +128,7 @@ typedef struct SArithmeticSupport {
SExprInfo *pArithExpr;
int32_t numOfCols;
SColumnInfo *colList;
SArray* exprList; // client side used
void *exprList; // client side used
int32_t offset;
char** data;
} SArithmeticSupport;
......
......@@ -4133,11 +4133,20 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].bytes;
if (pQuery->pExpr2 == NULL) {
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].bytes;
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows;
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows;
}
} else {
for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) {
int32_t bytes = pQuery->pExpr2[col].bytes;
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows;
}
}
int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo);
......@@ -4549,8 +4558,8 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
return terrno;
}
static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
int32_t numOfCols = pQuery->numOfOutput;
static SFillColInfo* createFillColInfo(SQuery* pQuery) {
int32_t numOfCols = pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2;
int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
......@@ -4558,8 +4567,9 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return NULL;
}
// TODO refactor
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i];
SExprInfo* pExprInfo = (pQuery->pExpr2 == NULL)? &pQuery->pSelectExpr[i]:&pQuery->pExpr2[i];
pFillCol[i].col.bytes = pExprInfo->bytes;
pFillCol[i].col.type = (int8_t)pExprInfo->type;
......@@ -4664,14 +4674,15 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
}
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
SFillColInfo* pColInfo = createFillColInfo(pQuery);
STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey);
TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey);
getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w);
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, pQuery->numOfOutput,
int32_t numOfCols = pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2;
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, numOfCols,
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
pQuery->fillType, pColInfo, pQInfo);
}
......@@ -5324,6 +5335,69 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
SExprInfo* pExprInfo = (SExprInfo*) pSupport->exprList;
int32_t index = -1;
for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
if (colId == pExprInfo[i].base.resColId) {
index = i;
break;
}
}
assert(index >= 0 && index < pSupport->numOfCols);
return pSupport->data[index] + pSupport->offset * pExprInfo[index].bytes;
}
static void doSecondaryArithmeticProcess(SQuery* pQuery) {
SArithmeticSupport *pArithSup = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
int32_t bytes = pQuery->pExpr2[i].bytes;
data[i] = (tFilePage *)malloc(bytes * pQuery->rec.rows + sizeof(tFilePage));
}
pArithSup->offset = 0;
pArithSup->numOfCols = (int32_t)pQuery->numOfOutput;
pArithSup->exprList = pQuery->pSelectExpr;
pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES);
for (int32_t k = 0; k < pArithSup->numOfCols; ++k) {
pArithSup->data[k] = pQuery->sdata[k]->data;
}
for (int i = 0; i < pQuery->numOfExpr2; ++i) {
SExprInfo *pExpr = &pQuery->pExpr2[i];
// calculate the result from several other columns
SSqlFuncMsg* pSqlFunc = &pExpr->base;
if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (pSqlFunc->functionId == pQuery->pSelectExpr[j].base.functionId &&
pSqlFunc->colInfo.colId == pQuery->pSelectExpr[j].base.colInfo.colId) {
memcpy(data[i]->data, pQuery->sdata[j]->data, pQuery->pSelectExpr[j].bytes * pQuery->rec.rows);
break;
}
}
} else {
pArithSup->pArithExpr = pExpr;
tExprTreeCalcTraverse(pArithSup->pArithExpr->pExpr, pQuery->rec.rows, data[i]->data, pArithSup, TSDB_ORDER_ASC,
getArithemicInputSrc);
}
}
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
memcpy(pQuery->sdata[i]->data, data[i]->data, pQuery->pExpr2[i].bytes * pQuery->rec.rows);
}
tfree(pArithSup);
}
/*
* in each query, this function will be called only once, no retry for further result.
*
......@@ -5343,13 +5417,14 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
finalizeQueryResult(pRuntimeEnv);
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
doSecondaryArithmeticProcess(pQuery);
if (IS_QUERY_KILLED(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
skipResults(pRuntimeEnv);
limitResults(pRuntimeEnv);
}
......@@ -5469,8 +5544,15 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
}
// no result generated, abort
if (pQuery->rec.rows == 0) {
break;
}
doSecondaryArithmeticProcess(pQuery);
// the offset is handled at prepare stage if no interpolation involved
if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) {
if (pQuery->fillType == TSDB_FILL_NONE) {
limitResults(pRuntimeEnv);
break;
} else {
......@@ -5693,7 +5775,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
* @param pExpr
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, SSqlFuncMsg ***pSecStageExpr,
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -5724,6 +5806,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder);
pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags);
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
// query msg safety check
if (!validateQueryMsg(pQueryMsg)) {
......@@ -5793,9 +5876,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pExprMsg->resColId = htons(pExprMsg->resColId);
pMsg += sizeof(SSqlFuncMsg);
......@@ -5831,6 +5915,49 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
goto _cleanup;
}
if (pQueryMsg->secondStageOutput) {
pExprMsg = (SSqlFuncMsg *)pMsg;
*pSecStageExpr = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES);
for (int32_t i = 0; i < pQueryMsg->secondStageOutput; ++i) {
(*pSecStageExpr)[i] = pExprMsg;
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes);
if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
pExprMsg->arg[j].argValue.pz = pMsg;
pMsg += pExprMsg->arg[j].argBytes; // one more for the string terminated char.
} else {
pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64);
}
}
int16_t functionId = pExprMsg->functionId;
if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) {
if (!TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { // ignore the column index check for arithmetic expression.
code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
}
} else {
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
// return TSDB_CODE_QRY_INVALID_MSG;
// }
}
pExprMsg = (SSqlFuncMsg *)pMsg;
}
}
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
......@@ -5936,8 +6063,8 @@ _cleanup:
return code;
}
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
qDebug("qmsg:%p create arithmetic expr from binary string: %s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
tExprNode* pExprNode = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) {
......@@ -5957,7 +6084,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
return TSDB_CODE_SUCCESS;
}
static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
SColumnInfo* pTagCols) {
*pExprInfo = NULL;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -5970,7 +6097,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
int16_t tagLen = 0;
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
pExprs[i].bytes = 0;
......@@ -5979,7 +6106,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
// parse the arithmetic expression
if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) {
code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg);
if (code != TSDB_CODE_SUCCESS) {
tfree(pExprs);
......@@ -6032,7 +6159,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
}
// TODO refactor
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
int16_t functId = pExprs[i].base.functionId;
......@@ -6230,7 +6357,7 @@ static void calResultBufSize(SQuery* pQuery) {
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) {
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
......@@ -6257,6 +6384,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->order.order = pQueryMsg->order;
pQuery->order.orderColId = pQueryMsg->orderColId;
pQuery->pSelectExpr = pExprs;
pQuery->pExpr2 = pSecExprs;
pQuery->numOfExpr2 = pQueryMsg->secondStageOutput;
pQuery->pGroupbyExpr = pGroupbyExpr;
memcpy(&pQuery->interval, &pQueryMsg->interval, sizeof(pQuery->interval));
pQuery->fillType = pQueryMsg->fillType;
......@@ -6296,7 +6425,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
// allocate additional memory for interResults that are usually larger then final results
size_t size = (size_t)((pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage));
int16_t bytes = 0;
if (pQuery->pExpr2 == NULL || col > pQuery->numOfExpr2) {
bytes = pExprs[col].bytes;
} else {
bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes);
}
size_t size = (size_t)((pQuery->rec.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage));
pQuery->sdata[col] = (tFilePage *)calloc(1, size);
if (pQuery->sdata[col] == NULL) {
goto _cleanup;
......@@ -6670,16 +6806,19 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
int32_t code = TSDB_CODE_SUCCESS;
char *tagCond = NULL;
char *tbnameCond = NULL;
char *tagCond = NULL;
char *tbnameCond = NULL;
SArray *pTableIdList = NULL;
SSqlFuncMsg **pExprMsg = NULL;
SExprInfo *pExprs = NULL;
SSqlFuncMsg **pExprMsg = NULL;
SSqlFuncMsg **pSecExprMsg = NULL;
SExprInfo *pExprs = NULL;
SExprInfo *pSecExprs = NULL;
SColIndex *pGroupColIndex = NULL;
SColumnInfo *pTagColumnInfo = NULL;
SSqlGroupbyExpr *pGroupbyExpr = NULL;
code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo);
code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &pSecExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
......@@ -6696,10 +6835,16 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
if ((code = createQFunctionExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
if (pSecExprMsg != NULL) {
if ((code = createQFunctionExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &pSecExprs, pSecExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _over;
......@@ -6757,7 +6902,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery);
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pSecExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery);
pExprs = NULL;
pGroupbyExpr = NULL;
pTagColumnInfo = NULL;
......
......@@ -851,11 +851,182 @@ endi
print =====================>td-1442
sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev);
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
print =====================> aggregation + arithmetic + fill
#sql select avg(cpu_taosd) - first(cpu_taosd) from dn1 where ts<'2020-11-13 11:00:00' and ts>'2020-11-13 10:50:00' interval(10s) fill(value, 99)
#sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(value, 99);
#sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(NULL);
print =====================> td-2060
sql create table m1 (ts timestamp, k int ) tags(a int);
sql create table if not exists tm0 using m1 tags(1);
sql insert into tm0 values('2020-1-1 1:1:1', 1);
sql insert into tm0 values('2020-1-1 1:1:2', 2);
sql insert into tm0 values('2020-1-1 1:1:3', 3);
sql insert into tm0 values('2020-1-1 1:2:4', 4);
sql insert into tm0 values('2020-1-1 1:2:5', 5);
sql insert into tm0 values('2020-1-1 1:2:6', 6);
sql insert into tm0 values('2020-1-1 1:3:7', 7);
sql insert into tm0 values('2020-1-1 1:3:8', 8);
sql insert into tm0 values('2020-1-1 1:3:9', 9);
sql insert into tm0 values('2020-1-1 1:4:10', 10);
sql select max(k)-min(k),last(k)-first(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:2:15' interval(10s) fill(value, 99,91,90,89,88,87,86,85);
if $rows != 8 then
return -1
endi
if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data01 != 2.000000000 then
return -1
endi
if $data02 != 2.000000000 then
return -1
endi
if $data03 != -2.000000000 then
return -1
endi
if $data10 != @20-01-01 01:01:10.000@ then
return -1
endi
if $data11 != 99.000000000 then
return -1
endi
if $data12 != 91.000000000 then
return -1
endi
if $data13 != 90.000000000 then
return -1
endi
if $data60 != @20-01-01 01:02:00.000@ then
return -1
endi
if $data61 != 2.000000000 then
return -1
endi
if $data62 != 2.000000000 then
return -1
endi
if $data63 != -2.000000000 then
return -1
endi
if $data70 != @20-01-01 01:02:10.000@ then
return -1
endi
if $data71 != 99.000000000 then
return -1
endi
if $data72 != 91.000000000 then
return -1
endi
if $data73 != 90.000000000 then
return -1
endi
sql select first(k)-avg(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:2:15' interval(10s) fill(NULL);
if $rows != 8 then
return -1
endi
if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data01 != -1.000000000 then
return -1
endi
if $data02 != -2.000000000 then
return -1
endi
if $data10 != @20-01-01 01:01:10.000@ then
return -1
endi
if $data11 != NULL then
return -1
endi
if $data12 != NULL then
return -1
endi
sql select max(k)-min(k),last(k)-first(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 4:2:15' interval(500a) fill(value, 99,91,90,89,88,87,86,85) order by ts asc;
if $rows != 21749 then
return -1
endi
sql select max(k)-min(k),last(k)-first(k),0-spread(k),count(1) from m1 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:2:15' interval(10s) fill(value, 99,91,90,89,88,87,86,85) order by ts asc;
if $rows != 8 then
return -1
endi
if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data00 != @20-01-01 01:01:00.000@ then
return -1
endi
if $data1
if $data01 != 2.000000000 then
return -1
endi
if $data02 != 2.000000000 then
return -1
endi
if $data03 != -2.000000000 then
return -1
endi
if $data04 != 3 then
return -1
endi
if $data10 != @20-01-01 01:01:10.000@ then
return -1
endi
if $data11 != 99.000000000 then
return -1
endi
if $data12 != 91.000000000 then
return -1
endi
if $data13 != 90.000000000 then
return -1
endi
if $data14 != 89 then
return -1
endi
print =============== clear
#sql drop database $db
#sql show databases
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册