提交 ba6acfd4 编写于 作者: H hjxilinx

[TD-197] fix bugs in arithmetic expression calculation

上级 b2f17cb5
...@@ -78,7 +78,7 @@ typedef struct STableMetaInfo { ...@@ -78,7 +78,7 @@ typedef struct STableMetaInfo {
*/ */
int32_t vgroupIndex; int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name char name[TSDB_TABLE_ID_LEN]; // (super) table name
SArray* tagColList; // involved tag columns SArray* tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo; } STableMetaInfo;
/* the structure for sql function in select clause */ /* the structure for sql function in select clause */
......
...@@ -3294,29 +3294,26 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3294,29 +3294,26 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
} }
} }
char *arithmetic_callback_function(void *param, char *name, int32_t colId) { char *getArithColumnData(void *param, const char* name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param; SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SArithExprInfo *pExpr = pSupport->pArithExpr; int32_t index = -1;
int32_t colIndex = -1; for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
if (colId == pSupport->colList[i].colId) {
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) { index = i;
if (colId == pExpr->binExprInfo.pReqColumns[i].colId) {
colIndex = pExpr->binExprInfo.pReqColumns[i].colIndex;
break; break;
} }
} }
assert(colIndex >= 0 && colId >= 0); assert(index >= 0 && colId >= 0);
return pSupport->data[colIndex] + pSupport->offset * pSupport->elemSize[colIndex]; return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
} }
static void arithmetic_function(SQLFunctionCtx *pCtx) { static void arithmetic_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size; GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz; SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
tSQLBinaryExprCalcTraverse(sas->pArithExpr->binExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, tExprTreeCalcTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size; pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
pCtx->param[1].pz = NULL; pCtx->param[1].pz = NULL;
...@@ -3327,8 +3324,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3327,8 +3324,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz; SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
sas->offset = index; sas->offset = index;
tSQLBinaryExprCalcTraverse(sas->pArithExpr->binExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes; pCtx->aOutputBuf += pCtx->outputBytes;
} }
......
...@@ -101,7 +101,7 @@ static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex); ...@@ -101,7 +101,7 @@ static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql); static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql);
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql);
static int32_t getColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t optrToString(tSQLExpr* pExpr, char** exprString); static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
...@@ -116,7 +116,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql ...@@ -116,7 +116,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql
static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols); static int32_t exprTreeFromSqlExpr(tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols);
/* /*
* Used during parsing query sql. Since the query sql usually small in length, error position * Used during parsing query sql. Since the query sql usually small in length, error position
...@@ -1171,13 +1171,32 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1171,13 +1171,32 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
SColumnIndex index = {.tableIndex = tableIndex}; SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE,
sizeof(double), sizeof(double), false); sizeof(double), sizeof(double), false);
addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), index.tableIndex);
/* todo alias name should use the original sql string */ /* todo alias name should use the original sql string */
char* name = (pItem->aliasName != NULL)? pItem->aliasName:arithmeticExprStr; char* name = (pItem->aliasName != NULL)? pItem->aliasName:arithmeticExprStr;
strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN); strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN);
tExprNode* pNode = NULL;
SArray* colList = taosArrayInit(10, sizeof(SColIndex));
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprsInfo, pQueryInfo, colList);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(pQueryInfo->msg, "invalid arithmetic expression in select clause");
}
SBuffer buf = exprTreeToBinary(pNode);
size_t len = tbufTell(&buf);
char* c = tbufGetData(&buf, true);
// set the serialized binary string as the parameter of arithmetic expression
addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, len, index.tableIndex);
insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr);
taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL);
} else { } else {
columnList.num = 0; columnList.num = 0;
columnList.ids[0] = (SColumnIndex) {0, 0}; columnList.ids[0] = (SColumnIndex) {0, 0};
...@@ -1196,8 +1215,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1196,8 +1215,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
pFuncExpr->interResBytes = sizeof(double); pFuncExpr->interResBytes = sizeof(double);
pFuncExpr->type = TSDB_DATA_TYPE_DOUBLE; pFuncExpr->type = TSDB_DATA_TYPE_DOUBLE;
SExprInfo* pBinExprInfo = &pFuncExpr->binExprInfo;
tExprNode* pNode = NULL; tExprNode* pNode = NULL;
// SArray* colList = taosArrayInit(10, sizeof(SColIndex)); // SArray* colList = taosArrayInit(10, sizeof(SColIndex));
...@@ -1207,25 +1224,25 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1207,25 +1224,25 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause"); return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause");
} }
pBinExprInfo->pBinExpr = pNode; pFuncExpr->pExpr = pNode;
assert(0); assert(0);
// pBinExprInfo->pReqColumns = pColIndex; // pExprInfo->pReqColumns = pColIndex;
for(int32_t k = 0; k < pBinExprInfo->numOfCols; ++k) {
SColIndex* pCol = &pBinExprInfo->pReqColumns[k];
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t f = 0; f < size; ++f) { // for(int32_t k = 0; k < pFuncExpr->numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f); // SColIndex* pCol = &pFuncExpr->colList[k];
if (strcmp(pExpr->aliasName, pCol->name) == 0) { // size_t size = tscSqlExprNumOfExprs(pQueryInfo);
pCol->colIndex = f; //
break; // for(int32_t f = 0; f < size; ++f) {
} // SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f);
} // if (strcmp(pExpr->aliasName, pCol->name) == 0) {
// pCol->colIndex = f;
assert(pCol->colIndex >= 0 && pCol->colIndex < size); // break;
tfree(pNode); // }
} // }
//
// assert(pCol->colIndex >= 0 && pCol->colIndex < size);
// tfree(pNode);
// }
} }
} }
} else { } else {
...@@ -1317,28 +1334,6 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c ...@@ -1317,28 +1334,6 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index) { void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index) {
tscColumnListInsert(pTableMetaInfo->tagColList, index); tscColumnListInsert(pTableMetaInfo->tagColList, index);
// if (pTableMetaInfo->numOfTags == 0 || pTableMetaInfo->tagColumnIndex[pTableMetaInfo->numOfTags - 1] < tagColIndex) {
// pTableMetaInfo->tagColumnIndex[pTableMetaInfo->numOfTags++] = tagColIndex;
// } else { // find the appropriate position
// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) {
// if (tagColIndex > pTableMetaInfo->tagColumnIndex[i]) {
// continue;
// } else if (tagColIndex == pTableMetaInfo->tagColumnIndex[i]) {
// break;
// } else {
// memmove(&pTableMetaInfo->tagColumnIndex[i + 1], &pTableMetaInfo->tagColumnIndex[i],
// sizeof(pTableMetaInfo->tagColumnIndex[0]) * (pTableMetaInfo->numOfTags - i));
//
// pTableMetaInfo->tagColumnIndex[i] = tagColIndex;
//
// pTableMetaInfo->numOfTags++;
// break;
// }
// }
// }
// plus one means tbname
// assert(tagColIndex >= -1 && tagColIndex < TSDB_MAX_TAGS && pTableMetaInfo->numOfTags <= TSDB_MAX_TAGS + 1);
} }
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) { static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) {
...@@ -2047,7 +2042,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIn ...@@ -2047,7 +2042,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIn
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
if (pQueryInfo->pTableMetaInfo == NULL || pQueryInfo->numOfTables == 0) { if (pQueryInfo->pTableMetaInfo == NULL || pQueryInfo->numOfTables == 0) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
...@@ -3754,12 +3749,23 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, ...@@ -3754,12 +3749,23 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
tSQLExpr* p1 = extractExprForSTable(pExpr, pQueryInfo, i); tSQLExpr* p1 = extractExprForSTable(pExpr, pQueryInfo, i);
tExprNode* p = NULL; tExprNode* p = NULL;
ret = exprTreeFromSqlExpr(&p, p1, NULL, pQueryInfo, NULL); SArray* colList = taosArrayInit(10, sizeof(SColIndex));
ret = exprTreeFromSqlExpr(&p, p1, NULL, pQueryInfo, colList);
SBuffer buf = exprTreeToBinary(p); SBuffer buf = exprTreeToBinary(p);
int64_t uid = tscGetMetaInfo(pQueryInfo, i)->pTableMeta->uid; // add to source column list
tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &buf); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
int64_t uid = pTableMetaInfo->pTableMeta->uid;
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
size_t num = taosArrayGetSize(colList);
for(int32_t j = 0; j < num; ++j) {
SColIndex* pIndex = taosArrayGet(colList, j);
SColumnIndex index = {.tableIndex = i, .columnIndex = pIndex->colIndex - numOfCols};
addRequiredTagColumn(pTableMetaInfo, &index);
}
tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &buf);
doCompactQueryExpr(pExpr); doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1); tSQLExprDestroy(p1);
...@@ -5871,7 +5877,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5871,7 +5877,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return TSDB_CODE_SUCCESS; // Does not build query message here return TSDB_CODE_SUCCESS; // Does not build query message here
} }
int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols) { int32_t exprTreeFromSqlExpr(tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols) {
tExprNode* pLeft = NULL; tExprNode* pLeft = NULL;
tExprNode* pRight= NULL; tExprNode* pRight= NULL;
...@@ -5896,8 +5902,9 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr ...@@ -5896,8 +5902,9 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
(*pExpr)->pVal = calloc(1, sizeof(tVariant)); (*pExpr)->pVal = calloc(1, sizeof(tVariant));
tVariantAssign((*pExpr)->pVal, &pSqlExpr->val); tVariantAssign((*pExpr)->pVal, &pSqlExpr->val);
return TSDB_CODE_SUCCESS;
} else if (pSqlExpr->nSQLOptr >= TK_COUNT && pSqlExpr->nSQLOptr <= TK_AVG_IRATE) { } else if (pSqlExpr->nSQLOptr >= TK_COUNT && pSqlExpr->nSQLOptr <= TK_AVG_IRATE) {
// arithmetic expression on the results of aggregation functions
*pExpr = calloc(1, sizeof(tExprNode)); *pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TSQL_NODE_COL; (*pExpr)->nodeType = TSQL_NODE_COL;
(*pExpr)->pSchema = calloc(1, sizeof(SSchema)); (*pExpr)->pSchema = calloc(1, sizeof(SSchema));
...@@ -5915,7 +5922,7 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr ...@@ -5915,7 +5922,7 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
break; break;
} }
} }
} else if (pSqlExpr->nSQLOptr == TK_ID) { // column name } else if (pSqlExpr->nSQLOptr == TK_ID) { // column name, normal column arithmetic expression
SColumnIndex index = {0}; SColumnIndex index = {0};
int32_t ret = getColumnIndexByName(&pSqlExpr->colInfo, pQueryInfo, &index); int32_t ret = getColumnIndexByName(&pSqlExpr->colInfo, pQueryInfo, &index);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -5930,17 +5937,20 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr ...@@ -5930,17 +5937,20 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
*(*pExpr)->pSchema = *pSchema; *(*pExpr)->pSchema = *pSchema;
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_INVALID_SQL;
}
if (pCols != NULL) { // record the involved columns if (pCols != NULL) { // record the involved columns
SColIndex colIndex = {0}; SColIndex colIndex = {0};
strncpy(colIndex.name, pSqlExpr->operand.z, pSqlExpr->operand.n); strncpy(colIndex.name, pSchema->name, TSDB_COL_NAME_LEN);
colIndex.colId = pSchema->colId;
colIndex.colIndex = index.columnIndex;
taosArrayPush(pCols, &colIndex); taosArrayPush(pCols, &colIndex);
} }
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_INVALID_SQL;
}
} else { } else {
*pExpr = (tExprNode *)calloc(1, sizeof(tExprNode)); *pExpr = (tExprNode *)calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TSQL_NODE_EXPR; (*pExpr)->nodeType = TSQL_NODE_EXPR;
......
...@@ -788,9 +788,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -788,9 +788,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
// by plus one char to make the string null-terminated
pMsg += pExpr->param[j].nLen + 1;
} else { } else {
pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key); pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
} }
...@@ -1848,7 +1846,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1848,7 +1846,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return msgLen; return TSDB_CODE_SUCCESS;
} }
int tscProcessTableMetaRsp(SSqlObj *pSql) { int tscProcessTableMetaRsp(SSqlObj *pSql) {
......
...@@ -458,20 +458,21 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF ...@@ -458,20 +458,21 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
} }
} }
static char *getArithemicInputSrc(void *param, char *name, int32_t colId) { static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param; // SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SArithExprInfo * pExpr = pSupport->pArithExpr; // SArithExprInfo * pExpr = pSupport->pArithExpr;
int32_t index = -1; // int32_t index = -1;
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) { // for (int32_t i = 0; i < pExpr->numOfCols; ++i) {
if (strcmp(name, pExpr->binExprInfo.pReqColumns[i].name) == 0) { // if (strcmp(name, pExpr->colList[i].name) == 0) {
index = i; // index = i;
break; // break;
} // }
} // }
//
assert(index >= 0 && index < pExpr->binExprInfo.numOfCols); // assert(index >= 0 && index < pExpr->numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index]; // return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
return 0;
} }
static void **doSetResultRowData(SSqlObj *pSql) { static void **doSetResultRowData(SSqlObj *pSql) {
...@@ -521,21 +522,21 @@ static void **doSetResultRowData(SSqlObj *pSql) { ...@@ -521,21 +522,21 @@ static void **doSetResultRowData(SSqlObj *pSql) {
sas->offset = 0; sas->offset = 0;
sas->pArithExpr = pInfo->pArithExprInfo; sas->pArithExpr = pInfo->pArithExprInfo;
sas->numOfCols = sas->pArithExpr->binExprInfo.numOfCols; // sas->numOfCols = sas->pArithExpr->numOfCols;
if (pRes->buffer[i] == NULL) { if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes); pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes);
} }
for(int32_t k = 0; k < sas->numOfCols; ++k) { for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pArithExpr->binExprInfo.pReqColumns[k].colIndex; // int32_t columnIndex = sas->pArithExpr->colList[k].colIndex;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex); // SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
//
sas->elemSize[k] = pExpr->resBytes; // sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; // sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
} }
tSQLBinaryExprCalcTraverse(sas->pArithExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc); tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i]; pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization free(sas); //todo optimization
...@@ -634,8 +635,6 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -634,8 +635,6 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
} }
if (success) { // current row of final output has been built, return to app if (success) { // current row of final output has been built, return to app
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex; int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex; int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
......
...@@ -214,8 +214,8 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { ...@@ -214,8 +214,8 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pBase = taosArrayGet(pQueryInfo->colList, i); SColumn* base = taosArrayGet(pQueryInfo->colList, i);
if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return true; return true;
} }
} }
......
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tcache.h" #include "tcache.h"
#include "trpc.h" #include "trpc.h"
#include "taosdef.h"
#include "tsocket.h"
#include "tsystem.h" #include "tsystem.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -34,7 +32,6 @@ ...@@ -34,7 +32,6 @@
// global, not configurable // global, not configurable
void * pVnodeConn; void * pVnodeConn;
void * tscCacheHandle; void * tscCacheHandle;
int slaveIndex;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
...@@ -46,7 +43,7 @@ static pthread_once_t tscinit = PTHREAD_ONCE_INIT; ...@@ -46,7 +43,7 @@ static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
void tscCheckDiskUsage(void *para, void *unused) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
...@@ -156,7 +153,6 @@ void taos_init_imp() { ...@@ -156,7 +153,6 @@ void taos_init_imp() {
} }
tscInitMsgsFp(); tscInitMsgsFp();
slaveIndex = rand();
int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections; int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;
if (tscEmbedded == 0) { if (tscEmbedded == 0) {
...@@ -379,7 +375,6 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { ...@@ -379,7 +375,6 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
return 0; return 0;
} }
int taos_options(TSDB_OPTION option, const void *arg, ...) { int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0; static int32_t lock = 0;
......
...@@ -939,8 +939,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -939,8 +939,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i); SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i);
if (pInfo->pArithExprInfo != NULL) { if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(&pInfo->pArithExprInfo->binExprInfo.pBinExpr, NULL); tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
tfree(pInfo->pArithExprInfo->binExprInfo.pReqColumns); assert(0);
// tfree(pInfo->pArithExprInfo->colList);
} }
} }
...@@ -1678,7 +1679,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1678,7 +1679,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
} }
if (pTagCols == NULL) { if (pTagCols == NULL) {
pTableMetaInfo->tagColList = taosArrayInit(4, sizeof(SColumnIndex)); pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
} else { } else {
pTableMetaInfo->tagColList = taosArrayClone(pTagCols); pTableMetaInfo->tagColList = taosArrayClone(pTagCols);
} }
......
...@@ -356,17 +356,9 @@ typedef struct { ...@@ -356,17 +356,9 @@ typedef struct {
} SMDDropVnodeMsg; } SMDDropVnodeMsg;
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; int16_t colId; // column id
/* int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
* colIdx is the index of column in latest schema of table uint16_t flag; // denote if it is a tag or a normal column
* it is available in the client side. Also used to determine
* whether current table schema is up-to-date.
*
* colIdxInBuf is used to denote the index of column in pQuery->colList,
* this value is invalid in client side, as well as in cache block of vnode either.
*/
int16_t colIndex;
uint16_t flag; // denote if it is a tag or not
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SColIndex; } SColIndex;
...@@ -388,15 +380,9 @@ typedef struct SSqlFuncMsg { ...@@ -388,15 +380,9 @@ typedef struct SSqlFuncMsg {
} arg[3]; } arg[3];
} SSqlFuncMsg; } SSqlFuncMsg;
typedef struct SExprInfo {
struct tExprNode *pBinExpr; /* for binary expression */
int32_t numOfCols; /* binary expression involves the readed number of columns*/
SColIndex * pReqColumns; /* source column list */
} SExprInfo;
typedef struct SArithExprInfo { typedef struct SArithExprInfo {
SSqlFuncMsg pBase; SSqlFuncMsg base;
SExprInfo binExprInfo; struct tExprNode* pExpr;
int16_t bytes; int16_t bytes;
int16_t type; int16_t type;
int16_t interResBytes; int16_t interResBytes;
...@@ -794,12 +780,12 @@ typedef struct { ...@@ -794,12 +780,12 @@ typedef struct {
typedef struct { typedef struct {
int32_t numOfQueries; int32_t numOfQueries;
SQueryDesc *qdesc; SQueryDesc qdesc[];
} SQqueryList; } SQqueryList;
typedef struct { typedef struct {
int32_t numOfStreams; int32_t numOfStreams;
SStreamDesc *sdesc; SStreamDesc sdesc[];
} SStreamList; } SStreamList;
typedef struct { typedef struct {
......
...@@ -82,10 +82,11 @@ void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*)); ...@@ -82,10 +82,11 @@ void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param); void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param);
void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, char *, int32_t)); char *(*cb)(void *, const char*, int32_t));
void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids); // todo refactor: remove it
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res);
uint8_t getBinaryExprOptr(SSQLToken *pToken); uint8_t getBinaryExprOptr(SSQLToken *pToken);
......
...@@ -115,10 +115,10 @@ enum { ...@@ -115,10 +115,10 @@ enum {
typedef struct SArithmeticSupport { typedef struct SArithmeticSupport {
SArithExprInfo *pArithExpr; SArithExprInfo *pArithExpr;
int32_t elemSize[TSDB_MAX_COLUMNS];
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList;
int32_t offset; int32_t offset;
char * data[TSDB_MAX_COLUMNS]; char** data;
} SArithmeticSupport; } SArithmeticSupport;
typedef struct SQLPreAggVal { typedef struct SQLPreAggVal {
......
...@@ -38,7 +38,7 @@ void tVariantCreate(tVariant *pVar, SSQLToken *token); ...@@ -38,7 +38,7 @@ void tVariantCreate(tVariant *pVar, SSQLToken *token);
void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t type); void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t type);
void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t type); void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type);
void tVariantDestroy(tVariant *pV); void tVariantDestroy(tVariant *pV);
......
...@@ -609,17 +609,6 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty ...@@ -609,17 +609,6 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
} }
} }
/*
* qsort comparator
* sort the result to ensure meters with the same gid is grouped together
*/
//static int32_t compareByAddr(const void *pLeft, const void *pRight) {
// int64_t p1 = (int64_t) * ((tSkipListNode **)pLeft);
// int64_t p2 = (int64_t) * ((tSkipListNode **)pRight);
//
// DEFAULT_COMP(p1, p2);
//}
int32_t merge(SArray *pLeft, SArray *pRight, SArray *pFinalRes) { int32_t merge(SArray *pLeft, SArray *pRight, SArray *pFinalRes) {
// assert(pFinalRes->pRes == 0); // assert(pFinalRes->pRes == 0);
// //
...@@ -881,8 +870,8 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -881,8 +870,8 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
} }
} }
void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, char *, int32_t)) { char *(*getSourceDataBlock)(void *, const char*, int32_t)) {
if (pExprs == NULL) { if (pExprs == NULL) {
return; return;
} }
...@@ -893,13 +882,13 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut ...@@ -893,13 +882,13 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut
/* the left output has result from the left child syntax tree */ /* the left output has result from the left child syntax tree */
char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows); char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows);
if (pLeft->nodeType == TSQL_NODE_EXPR) { if (pLeft->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprCalcTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock); tExprTreeCalcTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
} }
/* the right output has result from the right child syntax tree */ /* the right output has result from the right child syntax tree */
char *pRightOutput = malloc(sizeof(int64_t) * numOfRows); char *pRightOutput = malloc(sizeof(int64_t) * numOfRows);
if (pRight->nodeType == TSQL_NODE_EXPR) { if (pRight->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock); tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
} }
if (pLeft->nodeType == TSQL_NODE_EXPR) { if (pLeft->nodeType == TSQL_NODE_EXPR) {
...@@ -961,7 +950,7 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut ...@@ -961,7 +950,7 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut
free(pRightOutput); free(pRightOutput);
} }
void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) { void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) {
if (pExprs == NULL) { if (pExprs == NULL) {
return; return;
} }
...@@ -971,17 +960,15 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) { ...@@ -971,17 +960,15 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) {
// recursive traverse left child branch // recursive traverse left child branch
if (pLeft->nodeType == TSQL_NODE_EXPR) { if (pLeft->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprTrv(pLeft, val, ids); tSQLBinaryExprTrv(pLeft, res);
} else if (pLeft->nodeType == TSQL_NODE_COL) { } else if (pLeft->nodeType == TSQL_NODE_COL) {
ids[*val] = pLeft->pSchema->colId; taosArrayPush(res, &pLeft->pSchema->colId);
(*val) += 1;
} }
if (pRight->nodeType == TSQL_NODE_EXPR) { if (pRight->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprTrv(pRight, val, ids); tSQLBinaryExprTrv(pRight, res);
} else if (pRight->nodeType == TSQL_NODE_COL) { } else if (pRight->nodeType == TSQL_NODE_COL) {
ids[*val] = pRight->pSchema->colId; taosArrayPush(res, &pRight->pSchema->colId);
(*val) += 1;
} }
} }
......
...@@ -53,8 +53,8 @@ ...@@ -53,8 +53,8 @@
/* get the qinfo struct address from the query struct address */ /* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \ #define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].bytes) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].type) #define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type)
typedef struct SPointInterpoSupporter { typedef struct SPointInterpoSupporter {
int32_t numOfCols; int32_t numOfCols;
...@@ -257,7 +257,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -257,7 +257,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t maxOutput = 0; int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
/* /*
* ts, tag, tagprj function can not decide the output number of current query * ts, tag, tagprj function can not decide the output number of current query
...@@ -334,7 +334,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { ...@@ -334,7 +334,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
int32_t numOfSelectivity = 0; int32_t numOfSelectivity = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functId = pQuery->pSelectExpr[i].base.functionId;
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) { if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
hasTags = true; hasTags = true;
continue; continue;
...@@ -352,7 +352,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { ...@@ -352,7 +352,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
return false; return false;
} }
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; } bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQInfo *pQInfo) { static bool limitResults(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
...@@ -370,7 +370,7 @@ static bool limitResults(SQInfo *pQInfo) { ...@@ -370,7 +370,7 @@ static bool limitResults(SQInfo *pQInfo) {
static bool isTopBottomQuery(SQuery *pQuery) { static bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS) { if (functionId == TSDB_FUNC_TS) {
continue; continue;
} }
...@@ -385,7 +385,7 @@ static bool isTopBottomQuery(SQuery *pQuery) { ...@@ -385,7 +385,7 @@ static bool isTopBottomQuery(SQuery *pQuery) {
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t index) { static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t index) {
// for a tag column, no corresponding field info // for a tag column, no corresponding field info
SColIndex *pColIndexEx = &pQuery->pSelectExpr[index].pBase.colInfo; SColIndex *pColIndexEx = &pQuery->pSelectExpr[index].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndexEx->flag)) { if (TSDB_COL_IS_TAG(pColIndexEx->flag)) {
return NULL; return NULL;
} }
...@@ -413,7 +413,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo ...@@ -413,7 +413,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo
*/ */
static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SDataStatis **pColStatis) { SDataStatis **pColStatis) {
SColIndex *pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo; SColIndex *pColIndex = &pQuery->pSelectExpr[col].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndex->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return false; return false;
} }
...@@ -717,7 +717,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat ...@@ -717,7 +717,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
...@@ -743,7 +743,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus ...@@ -743,7 +743,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset); aAggs[functionId].xFunctionF(&pCtx[k], offset);
} }
...@@ -817,13 +817,12 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { ...@@ -817,13 +817,12 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size,
SArray *pDataBlock) { SArray *pDataBlock) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
char *dataBlock = NULL; char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t functionId = pQuery->pSelectExpr[col].pBase.functionId; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
if (functionId == TSDB_FUNC_ARITHM) { if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pSelectExpr[col]; sas->pArithExpr = &pQuery->pSelectExpr[col];
...@@ -834,18 +833,32 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 ...@@ -834,18 +833,32 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
pCtx->startOffset = pQuery->pos - (size - 1); pCtx->startOffset = pQuery->pos - (size - 1);
} }
sas->offset = 0;
sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
// here the pQuery->colList and sas->colList are identical
for (int32_t i = 0; i < pQuery->numOfCols; ++i) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo *pColMsg = &pQuery->colList[i]; SColumnInfo *pColMsg = &pQuery->colList[i];
assert(0);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf); int32_t numOfCols = taosArrayGetSize(pDataBlock);
sas->elemSize[i] = pColMsg->bytes;
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset dataBlock = NULL;
for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor
SColumnInfoData *p = taosArrayGet(pDataBlock, k);
if (pColMsg->colId == p->info.colId) {
dataBlock = p->pData;
break;
}
}
assert(dataBlock != NULL);
sas->data[i] = dataBlock + pCtx->startOffset * pQuery->colList[i].bytes; // start from the offset
} }
sas->numOfCols = pQuery->numOfCols;
sas->offset = 0;
} else { // other type of query function } else { // other type of query function
SColIndex *pCol = &pQuery->pSelectExpr[col].pBase.colInfo; SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo;
if (TSDB_COL_IS_TAG(pCol->flag) || pDataBlock == NULL) { if (TSDB_COL_IS_TAG(pCol->flag) || pDataBlock == NULL) {
dataBlock = NULL; dataBlock = NULL;
} else { } else {
...@@ -896,7 +909,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -896,7 +909,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
SDataStatis *tpField = NULL; SDataStatis *tpField = NULL;
...@@ -953,13 +966,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -953,13 +966,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY * tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/ */
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
} }
} }
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
tfree(sasArray); tfree(sasArray);
} }
...@@ -1103,7 +1124,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1103,7 +1124,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
} }
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
SDataStatis *pColStatis = NULL; SDataStatis *pColStatis = NULL;
...@@ -1212,7 +1233,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1212,7 +1233,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
offset -= pCtx[0].startOffset; offset -= pCtx[0].startOffset;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset); aAggs[functionId].xFunctionF(&pCtx[k], offset);
} }
...@@ -1229,6 +1250,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1229,6 +1250,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
} }
pQuery->lastKey = lastKey + step; pQuery->lastKey = lastKey + step;
// todo refactor: extract method
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
free(sasArray); free(sasArray);
} }
...@@ -1343,7 +1374,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { ...@@ -1343,7 +1374,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES); SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pCtx[i].outputBytes; tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i]; pTagCtx[num++] = &pCtx[i];
...@@ -1383,7 +1414,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1383,7 +1414,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv->offset[0] = 0; pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
SColIndex* pIndex = &pSqlFuncMsg->colInfo; SColIndex* pIndex = &pSqlFuncMsg->colInfo;
...@@ -1425,7 +1456,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1425,7 +1456,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
int32_t functionId = pCtx->functionId; int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t f = pQuery->pSelectExpr[0].pBase.functionId; int32_t f = pQuery->pSelectExpr[0].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64Key = order; pCtx->param[2].i64Key = order;
...@@ -1533,7 +1564,7 @@ bool isFixedOutputQuery(SQuery *pQuery) { ...@@ -1533,7 +1564,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
} }
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase; SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base;
// ignore the ts_comp function // ignore the ts_comp function
if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 && if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 &&
...@@ -1555,7 +1586,7 @@ bool isFixedOutputQuery(SQuery *pQuery) { ...@@ -1555,7 +1586,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
bool isPointInterpoQuery(SQuery *pQuery) { bool isPointInterpoQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
return true; return true;
} }
...@@ -1567,7 +1598,7 @@ bool isPointInterpoQuery(SQuery *pQuery) { ...@@ -1567,7 +1598,7 @@ bool isPointInterpoQuery(SQuery *pQuery) {
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
bool isSumAvgRateQuery(SQuery *pQuery) { bool isSumAvgRateQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS) { if (functionId == TSDB_FUNC_TS) {
continue; continue;
} }
...@@ -1583,7 +1614,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) { ...@@ -1583,7 +1614,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) {
bool isFirstLastRowQuery(SQuery *pQuery) { bool isFirstLastRowQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
if (functionID == TSDB_FUNC_LAST_ROW) { if (functionID == TSDB_FUNC_LAST_ROW) {
return true; return true;
} }
...@@ -1599,7 +1630,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) { ...@@ -1599,7 +1630,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) {
static bool needReverseScan(SQuery *pQuery) { static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
continue; continue;
} }
...@@ -1772,7 +1803,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { ...@@ -1772,7 +1803,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
} else { } else {
bool hasMultioutput = false; bool hasMultioutput = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase; SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base;
if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) { if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
...@@ -1805,7 +1836,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) { ...@@ -1805,7 +1836,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
// the scan order is not matter // the scan order is not matter
static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) { static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG || if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG ||
functionId == TSDB_FUNC_TAG_DUMMY) { functionId == TSDB_FUNC_TAG_DUMMY) {
...@@ -2008,7 +2039,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI ...@@ -2008,7 +2039,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail; SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
// for primary timestamp column, set the flag // for primary timestamp column, set the flag
if (pQuery->pSelectExpr[i].pBase.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pQuery->pSelectExpr[i].base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pInterpDetail->primaryCol = 1; pInterpDetail->primaryCol = 1;
} }
...@@ -2031,11 +2062,11 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI ...@@ -2031,11 +2062,11 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// tag column does not need the interp environment // tag column does not need the interp environment
if (pQuery->pSelectExpr[i].pBase.functionId == TSDB_FUNC_TAG) { if (pQuery->pSelectExpr[i].base.functionId == TSDB_FUNC_TAG) {
continue; continue;
} }
int32_t colInBuf = 0; // pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf; int32_t colInBuf = 0; // pQuery->pSelectExpr[i].base.colInfo.colIdxInBuf;
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf; SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail)); pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
...@@ -2046,7 +2077,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI ...@@ -2046,7 +2077,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
assert(0); assert(0);
// for primary timestamp column, set the flag // for primary timestamp column, set the flag
if (pQuery->pSelectExpr[i].pBase.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pQuery->pSelectExpr[i].base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pInterpDetail->primaryCol = 1; pInterpDetail->primaryCol = 1;
} else { } else {
doSetInterpVal(pCtx, prevKey, type, 1, pPointInterpSupport->pPrevPoint[colInBuf]); doSetInterpVal(pCtx, prevKey, type, 1, pPointInterpSupport->pPrevPoint[colInBuf]);
...@@ -2145,7 +2176,7 @@ static int32_t getRowParamForMultiRowsOutput(SQuery *pQuery, bool isSTableQuery) ...@@ -2145,7 +2176,7 @@ static int32_t getRowParamForMultiRowsOutput(SQuery *pQuery, bool isSTableQuery)
int32_t rowparam = 1; int32_t rowparam = 1;
if (isTopBottomQuery(pQuery) && (!isSTableQuery)) { if (isTopBottomQuery(pQuery) && (!isSTableQuery)) {
rowparam = pQuery->pSelectExpr[1].pBase.arg->argValue.i64; rowparam = pQuery->pSelectExpr[1].base.arg->argValue.i64;
} }
return rowparam; return rowparam;
...@@ -2211,7 +2242,7 @@ UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, ...@@ -2211,7 +2242,7 @@ UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime,
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_SPREAD) { if (functionId == TSDB_FUNC_SPREAD) {
pRuntimeEnv->pCtx[i].param[1].dKey = stime; pRuntimeEnv->pCtx[i].param[1].dKey = stime;
...@@ -2270,7 +2301,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun ...@@ -2270,7 +2301,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
// todo disable this opt code block temporarily // todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { // for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; // int32_t functId = pQuery->pSelectExpr[i].base.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { // if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max); // return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
// } // }
...@@ -2298,8 +2329,8 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl ...@@ -2298,8 +2329,8 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
r = BLK_DATA_ALL_NEEDED; r = BLK_DATA_ALL_NEEDED;
} else { } else {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t colId = pQuery->pSelectExpr[i].pBase.colInfo.colId; int32_t colId = pQuery->pSelectExpr[i].base.colInfo.colId;
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId); r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
} }
...@@ -2533,14 +2564,14 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar ...@@ -2533,14 +2564,14 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) { void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base;
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
assert(pFuncMsg->numOfParams == 1); assert(pFuncMsg->numOfParams == 1);
doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag); doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
} else { } else {
// set tag value, by which the results are aggregated. // set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SColIndex *pCol = &pQuery->pSelectExpr[idx].pBase.colInfo; SColIndex *pCol = &pQuery->pSelectExpr[idx].base.colInfo;
// ts_comp column required the tag value for join filter // ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pCol->flag)) { if (!TSDB_COL_IS_TAG(pCol->flag)) {
...@@ -2566,7 +2597,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes ...@@ -2566,7 +2597,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (!mergeFlag) { if (!mergeFlag) {
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes; pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
pCtx[i].currentStage = FIRST_STAGE_MERGE; pCtx[i].currentStage = FIRST_STAGE_MERGE;
...@@ -2590,7 +2621,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes ...@@ -2590,7 +2621,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
} }
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY) {
continue; continue;
} }
...@@ -2680,15 +2711,15 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf ...@@ -2680,15 +2711,15 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
switch (pQuery->pSelectExpr[i].type) { switch (pQuery->pSelectExpr[i].type) {
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: {
int32_t colIndex = pQuery->pSelectExpr[i].pBase.colInfo.colIndex; int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex;
int32_t type = 0; int32_t type = 0;
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].pBase.colInfo.flag)) { if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].base.colInfo.flag)) {
type = pQuery->pSelectExpr[i].type; type = pQuery->pSelectExpr[i].type;
} else { } else {
type = pMeterObj->schema[colIndex].type; type = pMeterObj->schema[colIndex].type;
} }
printBinaryData(pQuery->pSelectExpr[i].pBase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j, printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
type); type);
break; break;
} }
...@@ -2843,7 +2874,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW ...@@ -2843,7 +2874,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
int64_t maxOutput = 0; int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
/* /*
* ts, tag, tagprj function can not decide the output number of current query * ts, tag, tagprj function can not decide the output number of current query
...@@ -3059,7 +3090,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * ...@@ -3059,7 +3090,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
// open/close the specified query for each group result // open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functId = pQuery->pSelectExpr[j].base.functionId;
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
...@@ -3081,7 +3112,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3081,7 +3112,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
} else { // for simple result of table query, } else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functId = pQuery->pSelectExpr[j].base.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
...@@ -3151,7 +3182,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3151,7 +3182,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->resultInfo = &pRuntimeEnv->resultInfo[i]; pCtx->resultInfo = &pRuntimeEnv->resultInfo[i];
// set the timestamp output buffer for top/bottom/diff query // set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
} }
...@@ -3167,7 +3198,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { ...@@ -3167,7 +3198,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
// reset the execution contexts // reset the execution contexts
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
assert(functionId != TSDB_FUNC_DIFF); assert(functionId != TSDB_FUNC_DIFF);
// set next output position // set next output position
...@@ -3194,7 +3225,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3194,7 +3225,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
pRuntimeEnv->pCtx[j].currentStage = 0; pRuntimeEnv->pCtx[j].currentStage = 0;
aAggs[functionId].init(&pRuntimeEnv->pCtx[j]); aAggs[functionId].init(&pRuntimeEnv->pCtx[j]);
...@@ -3220,7 +3251,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3220,7 +3251,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->rec.rows -= numOfSkip; pQuery->rec.rows -= numOfSkip;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes); memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
...@@ -3262,7 +3293,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3262,7 +3293,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf(pRuntimeEnv, pResult); setWindowResOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].pBase.functionId; int16_t functId = pQuery->pSelectExpr[j].base.functionId;
if (functId == TSDB_FUNC_TS) { if (functId == TSDB_FUNC_TS) {
continue; continue;
} }
...@@ -3275,7 +3306,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3275,7 +3306,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} else { } else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].pBase.functionId; int16_t functId = pQuery->pSelectExpr[j].base.functionId;
if (functId == TSDB_FUNC_TS) { if (functId == TSDB_FUNC_TS) {
continue; continue;
} }
...@@ -3443,7 +3474,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3443,7 +3474,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf(pRuntimeEnv, buf); setWindowResOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
} }
/* /*
...@@ -3455,14 +3486,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3455,14 +3486,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
} else { } else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
} }
} }
} }
static bool hasMainOutput(SQuery *pQuery) { static bool hasMainOutput(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) { if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) {
return true; return true;
...@@ -3568,7 +3599,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * ...@@ -3568,7 +3599,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
} }
...@@ -3666,7 +3697,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK ...@@ -3666,7 +3697,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK
bool requireTimestamp(SQuery *pQuery) { bool requireTimestamp(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; i++) { for (int32_t i = 0; i < pQuery->numOfOutput; i++) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) { if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) {
return true; return true;
} }
...@@ -3883,7 +3914,7 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t ...@@ -3883,7 +3914,7 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
srcData[i] = pDataSrc[i]->data; srcData[i] = pDataSrc[i]->data;
functions[i] = pQuery->pSelectExpr[i].pBase.functionId; functions[i] = pQuery->pSelectExpr[i].base.functionId;
} }
assert(0); assert(0);
...@@ -5337,9 +5368,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5337,9 +5368,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pDestFilterInfo->filterstr) { if (pDestFilterInfo->filterstr) {
pDestFilterInfo->len = htobe64(pFilterInfo->len); pDestFilterInfo->len = htobe64(pFilterInfo->len);
pDestFilterInfo->pz = (int64_t)calloc(1, pDestFilterInfo->len + 1); pDestFilterInfo->pz = (int64_t) calloc(1, pDestFilterInfo->len);
memcpy((void *)pDestFilterInfo->pz, pMsg, pDestFilterInfo->len + 1); memcpy((void *)pDestFilterInfo->pz, pMsg, pDestFilterInfo->len);
pMsg += (pDestFilterInfo->len + 1); pMsg += (pDestFilterInfo->len);
} else { } else {
pDestFilterInfo->lowerBndi = htobe64(pFilterInfo->lowerBndi); pDestFilterInfo->lowerBndi = htobe64(pFilterInfo->lowerBndi);
pDestFilterInfo->upperBndi = htobe64(pFilterInfo->upperBndi); pDestFilterInfo->upperBndi = htobe64(pFilterInfo->upperBndi);
...@@ -5372,7 +5403,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5372,7 +5403,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
pExprMsg->arg[j].argValue.pz = pMsg; pExprMsg->arg[j].argValue.pz = pMsg;
pMsg += pExprMsg->arg[j].argBytes + 1; // one more for the string terminated char. pMsg += pExprMsg->arg[j].argBytes; // one more for the string terminated char.
} else { } else {
pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64); pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64);
} }
...@@ -5436,13 +5467,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5436,13 +5467,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pMsg += sizeof(int64_t) * pQueryMsg->numOfOutput; pMsg += sizeof(int64_t) * pQueryMsg->numOfOutput;
} }
// the tag query condition expression string is located at the end of query msg
if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen;
}
if (pQueryMsg->numOfTags > 0) { if (pQueryMsg->numOfTags > 0) {
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags); (*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
...@@ -5454,9 +5478,17 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5454,9 +5478,17 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pTagCol->numOfFilters = 0; pTagCol->numOfFilters = 0;
(*tagCols)[i] = *pTagCol; (*tagCols)[i] = *pTagCol;
pMsg += sizeof(SColumnInfo);
} }
} }
// the tag query condition expression string is located at the end of query msg
if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen;
}
qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64
", numOfGroupbyTagCols:%d, ts order:%d, " ", numOfGroupbyTagCols:%d, ts order:%d, "
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64
...@@ -5468,54 +5500,38 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5468,54 +5500,38 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
return 0; return 0;
} }
static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pExpr, SQueryTableMsg *pQueryMsg) { //static int32_t id_compar(const void* p1, const void* p2) {
// SExprInfo *pBinaryExprInfo = &pExpr->binExprInfo; // return GET_INT16_VAL(p1) - GET_INT16_VAL(p2);
// SColumnInfo * pColMsg = pQueryMsg->colList; //}
#if 0
tExprNode* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
qTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz); static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz, tExprNode* pExprNode = NULL;
pExpr->pBase.arg[0].argBytes);
if (pBinExpr == NULL) { qTrace("qmsg:%p create arithmetic expr from binary string", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
int32_t ret = exprTreeFromBinary(pArithExprInfo->base.arg[0].argValue.pz, pArithExprInfo->base.arg[0].argBytes, &pExprNode);
if (pExprNode == NULL || ret != TSDB_CODE_SUCCESS) {
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
pBinaryExprInfo->pBinExpr = pBinExpr; pArithExprInfo->pExpr = pExprNode;
int32_t num = 0;
int16_t ids[TSDB_MAX_COLUMNS] = {0};
tSQLBinaryExprTrv(pBinExpr, &num, ids);
qsort(ids, num, sizeof(int16_t), id_compar);
int32_t i = 0, j = 0; // SArray* res = taosArrayInit(4, sizeof(int16_t));
// tSQLBinaryExprTrv(pExprNode, res);
while (i < num && j < num) { // size_t num = taosArrayGetSize(res);
if (ids[i] == ids[j]) { // qsort(res->pData, num, sizeof(int16_t), id_compar);
j++;
} else {
ids[++i] = ids[j++];
}
}
assert(i <= num);
// there may be duplicated referenced columns. // there may be duplicated referenced columns.
num = i + 1; // pArithExprInfo->colList = calloc(pQueryMsg->numOfCols, sizeof(SColIndex));
pBinaryExprInfo->pReqColumns = malloc(sizeof(SColIndex) * num);
for (int32_t k = 0; k < num; ++k) { // for (int32_t k = 0; k < pQueryMsg->numOfCols; ++k) {
SColIndex* pColIndex = &pBinaryExprInfo->pReqColumns[k]; // SColIndex* pColIndex = &pArithExprInfo->colList[k];
pColIndex->colId = ids[k]; // pColIndex->colId = pQueryMsg->colList[k].colId;
} // }
pBinaryExprInfo->numOfCols = num;
free(pSchema);
#endif
// pArithExprInfo->numOfCols = pQueryMsg->numOfCols;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5533,14 +5549,14 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp ...@@ -5533,14 +5549,14 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
int16_t tagLen = 0; int16_t tagLen = 0;
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i]; pExprs[i].base = *pExprMsg[i];
pExprs[i].bytes = 0; pExprs[i].bytes = 0;
int16_t type = 0; int16_t type = 0;
int16_t bytes = 0; int16_t bytes = 0;
// parse the arithmetic expression // parse the arithmetic expression
if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) { if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) {
code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg); code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5550,26 +5566,26 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp ...@@ -5550,26 +5566,26 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
type = TSDB_DATA_TYPE_DOUBLE; type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize; bytes = tDataTypeDesc[type].nSize;
} else if (pExprs[i].pBase.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN; bytes = TSDB_TABLE_NAME_LEN;
} else{ } else{
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase, pTagCols); int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols); assert(j < pQueryMsg->numOfCols);
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].pBase.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
type = pCol->type; type = pCol->type;
bytes = pCol->bytes; bytes = pCol->bytes;
} }
int32_t param = pExprs[i].pBase.arg[0].argValue.i64; int32_t param = pExprs[i].base.arg[0].argValue.i64;
if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].type, &pExprs[i].bytes, if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
&pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { &pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
tfree(pExprs); tfree(pExprs);
return TSDB_CODE_INVALID_QUERY_MSG; return TSDB_CODE_INVALID_QUERY_MSG;
} }
if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) { if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExprs[i].bytes; tagLen += pExprs[i].bytes;
} }
assert(isValidDataType(pExprs[i].type, pExprs[i].bytes)); assert(isValidDataType(pExprs[i].type, pExprs[i].bytes));
...@@ -5579,17 +5595,17 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp ...@@ -5579,17 +5595,17 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
// TODO refactor // TODO refactor
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i]; pExprs[i].base = *pExprMsg[i];
int16_t functId = pExprs[i].pBase.functionId; int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase, pTagCols); int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols); assert(j < pQueryMsg->numOfCols);
SColumnInfo *pCol = &pQueryMsg->colList[j]; SColumnInfo *pCol = &pQueryMsg->colList[j];
int32_t ret = int32_t ret =
getResultDataInfo(pCol->type, pCol->bytes, functId, pExprs[i].pBase.arg[0].argValue.i64, getResultDataInfo(pCol->type, pCol->bytes, functId, pExprs[i].base.arg[0].argValue.i64,
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable); &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
assert(ret == TSDB_CODE_SUCCESS); assert(ret == TSDB_CODE_SUCCESS);
} }
...@@ -5714,7 +5730,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { ...@@ -5714,7 +5730,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert(pQuery->pSelectExpr != NULL && pQuery != NULL); assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
continue; continue;
} }
...@@ -5765,6 +5781,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5765,6 +5781,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->interpoType = pQueryMsg->interpoType; pQuery->interpoType = pQueryMsg->interpoType;
pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->numOfTags = pQueryMsg->numOfTags;
// todo do not allocate ??
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
goto _cleanup; goto _cleanup;
...@@ -5942,12 +5959,12 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5942,12 +5959,12 @@ static void freeQInfo(SQInfo *pQInfo) {
if (pQuery->pSelectExpr != NULL) { if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; // SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].arithExprInfo;
if (pBinExprInfo->numOfCols > 0) { // if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns); // tfree(pBinExprInfo->pReqColumns);
tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL); // tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL);
} // }
} }
tfree(pQuery->pSelectExpr); tfree(pQuery->pSelectExpr);
......
...@@ -72,7 +72,7 @@ void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t t ...@@ -72,7 +72,7 @@ void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t t
* @param len * @param len
* @param type * @param type
*/ */
void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t type) { void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
...@@ -109,10 +109,10 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t ...@@ -109,10 +109,10 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t
break; break;
} }
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
pVar->pz = strndup(pz, len); pVar->pz = calloc(len, sizeof(char));
pVar->nLen = strdequote(pVar->pz); memcpy(pVar->pz, pz, len);
pVar->nLen = len;
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册