提交 9663eea7 编写于 作者: H Haojun Liao

[td-2819]

上级 f5bfcc0b
......@@ -300,9 +300,9 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryNodeInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryNodeInfo, SQuery* pQuery);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
uint64_t* qId, char* sql);
......
......@@ -57,7 +57,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
// todo merge with vnode side function
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx) {
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
......@@ -66,9 +66,15 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx) {
pCtx[i].order = pQueryInfo->order.order;
pCtx[i].functionId = pExpr->base.functionId;
pCtx[i].order = pQueryInfo->order.order;
pCtx[i].functionId = pExpr->base.functionId;
// input buffer hold only one point data
pCtx[i].inputType = pExpr->base.colType;
pCtx[i].inputBytes = pExpr->base.colBytes;
SSchema *s = &pSchema[i].field;
// input data format comes from pModel
pCtx[i].inputType = s->type;
pCtx[i].inputBytes = s->bytes;
pCtx[i].outputBytes = pExpr->base.resBytes;
pCtx[i].outputType = pExpr->base.resType;
......@@ -133,6 +139,11 @@ static void setCtxInputOutputBuffer(SQueryInfo* pQueryInfo, SQLFunctionCtx *pCtx
// input buffer hold only one point data
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
pCtx[i].pInput = pReducer->pTempBuffer->data + offset;
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
}
}
......@@ -144,7 +155,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
for(int32_t i = 0; i < numOfCols; ++i) {
SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pIField->pExpr->pExpr != NULL) {
if (pIField->pExpr->pExpr == NULL) {
SExprInfo* pExpr = pIField->pExpr;
pFillCol[i].col.bytes = pExpr->base.resBytes;
......@@ -363,8 +374,9 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo);
tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx);
tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pDesc->pColumnModel->pFields);
setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc);
// we change the capacity of schema to denote that there is only one row in temp buffer
pReducer->pDesc->pColumnModel->capacity = 1;
......
......@@ -5283,7 +5283,11 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) {
return true;
}
return (pQueryInfo->window.skey == pQueryInfo->window.ekey) && (pQueryInfo->window.skey != 0);
if (pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX) {
return false;
}
return !(pQueryInfo->window.skey != pQueryInfo->window.ekey && pQueryInfo->interval.interval == 0);
}
int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql) {
......
......@@ -24,6 +24,7 @@
#include "tsclient.h"
#include "ttimer.h"
#include "tlockfree.h"
#include "qPlan.h"
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
......@@ -705,8 +706,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
SQuery* pQuery = tscCreateQueryFromQueryNodeInfo(pQueryInfo);
UNUSED(pQuery);
SQuery query = {0};
tscCreateQueryFromQueryInfo(pQueryInfo, &query);
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -731,54 +734,63 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
*/
{
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = pQuery->numOfTags;
int32_t numOfTags = query.numOfTags;
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr);
if (pQuery->order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(pQuery->window.skey);
pQueryMsg->window.ekey = htobe64(pQuery->window.ekey);
if (taosArrayGetSize(tableScanOperator) == 0) {
pQueryMsg->tableScanOperator = htonl(-1);
} else {
int32_t* tablescanOp = taosArrayGet(tableScanOperator, 0);
pQueryMsg->tableScanOperator = htonl(*tablescanOp);
}
if (query.order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(query.window.skey);
pQueryMsg->window.ekey = htobe64(query.window.ekey);
} else {
pQueryMsg->window.skey = htobe64(pQuery->window.ekey);
pQueryMsg->window.ekey = htobe64(pQuery->window.skey);
pQueryMsg->window.skey = htobe64(query.window.ekey);
pQueryMsg->window.ekey = htobe64(query.window.skey);
}
pQueryMsg->order = htons(pQuery->order.order);
pQueryMsg->orderColId = htons(pQuery->order.orderColId);
pQueryMsg->fillType = htons(pQuery->fillType);
pQueryMsg->limit = htobe64(pQuery->limit.limit);
pQueryMsg->offset = htobe64(pQuery->limit.offset);
pQueryMsg->order = htons(query.order.order);
pQueryMsg->orderColId = htons(query.order.orderColId);
pQueryMsg->fillType = htons(query.fillType);
pQueryMsg->limit = htobe64(query.limit.limit);
pQueryMsg->offset = htobe64(query.limit.offset);
pQueryMsg->numOfCols = htons(query.numOfCols);
pQueryMsg->interval.interval = htobe64(query.interval.interval);
pQueryMsg->interval.sliding = htobe64(query.interval.sliding);
pQueryMsg->interval.offset = htobe64(query.interval.offset);
pQueryMsg->interval.intervalUnit = query.interval.intervalUnit;
pQueryMsg->interval.slidingUnit = query.interval.slidingUnit;
pQueryMsg->interval.offsetUnit = query.interval.offsetUnit;
pQueryMsg->numOfCols = htons(pQuery->numOfCols);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->sw.gap = htobe64(query.sw.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
pQueryMsg->interval.interval = htobe64(pQuery->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQuery->interval.sliding);
pQueryMsg->interval.offset = htobe64(pQuery->interval.offset);
pQueryMsg->interval.intervalUnit = pQuery->interval.intervalUnit;
pQueryMsg->interval.slidingUnit = pQuery->interval.slidingUnit;
pQueryMsg->interval.offsetUnit = pQuery->interval.offsetUnit;
pQueryMsg->secondStageOutput = htonl(query.numOfExpr2);
pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type);
// pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->sw.gap = htobe64(pQuery->sw.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
pQueryMsg->numOfOutput = htons((int16_t)pQuery->numOfOutput); // this is the stage one output column number
// set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfo *pCol = &pQuery->colList[i];
SColumnInfo *pCol = &query.colList[i];
pQueryMsg->colList[i].colId = htons(pCol->colId);
pQueryMsg->colList[i].bytes = htons(pCol->bytes);
......@@ -814,23 +826,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
{
pQueryMsg->stableQuery = pQuery->stableQuery;
pQueryMsg->topBotQuery = pQuery->topBotQuery;
pQueryMsg->groupbyColumn = pQuery->groupbyColumn;
pQueryMsg->hasTagResults = pQuery->hasTagResults;
pQueryMsg->timeWindowInterpo = pQuery->timeWindowInterpo;
pQueryMsg->queryBlockDist = pQuery->queryBlockDist;
pQueryMsg->stabledev = pQuery->stabledev;
pQueryMsg->tsCompQuery = pQuery->tsCompQuery;
pQueryMsg->simpleAgg = pQuery->simpleAgg;
pQueryMsg->pointInterpQuery = pQuery->pointInterpQuery;
pQueryMsg->needReverseScan = pQuery->needReverseScan;
pQueryMsg->stableQuery = query.stableQuery;
pQueryMsg->topBotQuery = query.topBotQuery;
pQueryMsg->groupbyColumn = query.groupbyColumn;
pQueryMsg->hasTagResults = query.hasTagResults;
pQueryMsg->timeWindowInterpo = query.timeWindowInterpo;
pQueryMsg->queryBlockDist = query.queryBlockDist;
pQueryMsg->stabledev = query.stabledev;
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
}
SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlExpr *pExpr = &pQuery->pExpr1[i].base;
for (int32_t i = 0; i < query.numOfOutput; ++i) {
SSqlExpr *pExpr = &query.pExpr1[i].base;
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
......@@ -849,17 +861,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlExpr->uid = htobe64(pExpr->uid);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
......@@ -875,9 +886,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlExpr = (SSqlExpr *)pMsg;
}
if (pQuery->numOfExpr2 > 0) {
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
SSqlExpr *pExpr = &pQuery->pExpr2[i].base;
if (query.numOfExpr2 > 0) {
for (int32_t i = 0; i < query.numOfExpr2; ++i) {
SSqlExpr *pExpr = &query.pExpr2[i].base;
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
......@@ -886,23 +897,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
assert(pExpr->resColId < 0);
pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlExpr->uid = htobe64(pExpr->uid);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlExpr);
......@@ -921,14 +928,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlExpr = (SSqlExpr *)pMsg;
}
} else {
pQueryMsg->secondStageOutput = 0;
}
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
......@@ -950,16 +955,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
if (pQuery->fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
*((int64_t *)pMsg) = htobe64(pQuery->fillVal[i]);
pMsg += sizeof(pQuery->fillVal[0]);
if (query.fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < query.numOfOutput; ++i) {
*((int64_t *)pMsg) = htobe64(query.fillVal[i]);
pMsg += sizeof(query.fillVal[0]);
}
}
if (pQuery->numOfTags > 0) {
for (int32_t i = 0; i < pQuery->numOfTags; ++i) {
SColumnInfo* pTag = &pQuery->tagColList[i];
if (query.numOfTags > 0) {
for (int32_t i = 0; i < query.numOfTags; ++i) {
SColumnInfo* pTag = &query.tagColList[i];
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pTag->colId);
......@@ -1013,6 +1018,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
}
int32_t numOfOperator = taosArrayGetSize(queryOperator);
pQueryMsg->numOfOperator = htonl(numOfOperator);
for(int32_t i = 0; i < numOfOperator; ++i) {
int32_t *operator = taosArrayGet(queryOperator, i);
*(int32_t*)pMsg = htonl(*operator);
pMsg += sizeof(int32_t);
}
memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen;
......
......@@ -3218,8 +3218,6 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
uint64_t* qId, char* sql) {
assert(pQueryNodeInfo != NULL);
int16_t numOfCols = taosArrayGetSize(pQueryNodeInfo->colList);
int16_t numOfOutput = pQueryNodeInfo->fieldsInfo.numOfOutput;
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
......@@ -3229,42 +3227,10 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
SQuery* pQuery = &pQInfo->query;
pQInfo->runtimeEnv.pQuery = pQuery;
pQuery->tableGroupInfo = *pTableGroupInfo;
pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput;
pQuery->limit = pQueryNodeInfo->limit;
pQuery->order = pQueryNodeInfo->order;
pQuery->pExpr1 = pExprs;
pQuery->pExpr2 = NULL; // not support yet.
pQuery->numOfExpr2 = 0;
pQuery->pGroupbyExpr = NULL;
memcpy(&pQuery->interval, &pQueryNodeInfo->interval, sizeof(pQuery->interval));
pQuery->fillType = pQueryNodeInfo->fillType;
pQuery->numOfTags = 0;
pQuery->tagColList = NULL;
// pQuery->prjInfo.vgroupLimit = pQueryNodeInfo->vgroupLimit;
pQuery->prjInfo.ts = (pQueryNodeInfo->order.order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->sw = pQueryNodeInfo->sessionWindow;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) {
goto _cleanup;
}
pQuery->srcRowSize = 0;
pQuery->maxSrcColumnSize = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGet(pQueryNodeInfo->colList, i);
pQuery->colList[i] = pCol->info;
pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters);
SQuery *pQuery = &pQInfo->query;
pQuery->srcRowSize += pQuery->colList[i].bytes;
if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) {
pQuery->maxSrcColumnSize = pQuery->colList[i].bytes;
}
}
tscCreateQueryFromQueryInfo(pQueryNodeInfo, pQuery);
pQInfo->runtimeEnv.pQuery = pQuery;
// calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) {
......@@ -3283,16 +3249,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
// goto _cleanup;
// }
if (pQuery->fillType != TSDB_FILL_NONE) {
pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
if (pQuery->fillVal == NULL) {
goto _cleanup;
}
// the first column is the timestamp
memcpy(pQuery->fillVal, (char *)pQueryNodeInfo->fillVal, pQuery->numOfOutput * sizeof(int64_t));
}
size_t numOfGroups = 0;
if (pTableGroupInfo->pGroupList != NULL) {
numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
......@@ -3311,14 +3267,13 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pQInfo->rspContext = NULL;
pQInfo->sql = sql;
pthread_mutex_init(&pQInfo->lock, NULL);
tsem_init(&pQInfo->ready, 0, 0);
pQuery->window = pQueryNodeInfo->window;
// changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
bool groupByCol = false;//isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow window = pQuery->window;
......@@ -3339,7 +3294,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
window.skey = info->lastKey;
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, groupByCol, window, buf);
STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, pQuery->groupbyColumn, window, buf);
if (item == NULL) {
goto _cleanup;
}
......@@ -3347,7 +3302,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
item->groupIndex = i;
taosArrayPush(p1, &item);
// STableId* id = TSDB_TABLEID(info->pTable);
STableId id = {.tid = 0, .uid = 0};
taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
index += 1;
......@@ -3356,10 +3310,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
// colIdCheck(pQuery, pQInfo);
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->qId = 0;//atomic_add_fetch_64(&queryHandleId, 1);
pQInfo->qId = 0;
if (qId != NULL) {
*qId = pQInfo->qId;
}
......@@ -3384,7 +3335,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
STsBufInfo bufInfo = {0};
SQueryParam param = {0};
/*int32_t code = */initQInfo(&bufInfo, NULL, 0, pQInfo, &param, NULL, 0, false);
/*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, &param, NULL, 0);
qTableQuery(pQInfo);
return pQInfo;
......
......@@ -211,7 +211,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
assert(pExpr != NULL);
int32_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TAG) {
if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) {
continue;
}
......@@ -555,7 +555,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
int32_t numOfCols = taosArrayGetSize(px->colList);
SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,};
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
tsCreateSQLFunctionCtx(px, pCtx);
tsCreateSQLFunctionCtx(px, pCtx, NULL);
STableGroupInfo tableGroupInfo = {0};
tableGroupInfo.numOfTables = 1;
......@@ -3073,20 +3073,24 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
return p;
}
SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) {
SQuery* pQuery = calloc(1, sizeof(SQuery));
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) {
memset(pQuery, 0, sizeof(SQuery));
int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
pQuery->tsdb = NULL;
pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pQuery->hasTagResults = hasTagValOutput(pQueryInfo);
pQuery->tsdb = NULL;
pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pQuery->hasTagResults = hasTagValOutput(pQueryInfo);
pQuery->stabledev = isStabledev(pQueryInfo);
pQuery->tsCompQuery = isTsCompQuery(pQueryInfo);
pQuery->simpleAgg = isSimpleAggregate(pQueryInfo);
pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQuery->stabledev = isStabledev(pQueryInfo);
pQuery->tsCompQuery = isTsCompQuery(pQueryInfo);
pQuery->simpleAgg = isSimpleAggregate(pQueryInfo);
pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo);
int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput;
......@@ -3096,18 +3100,19 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) {
pQuery->pExpr2 = NULL; // not support yet.
pQuery->numOfExpr2 = 0;
pQuery->pGroupbyExpr = NULL;
memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval));
pQuery->fillType = pQueryInfo->fillType;
pQuery->numOfTags = 0;
pQuery->tagColList = NULL;
memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval));
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
pQuery->vgId = 0;
pQuery->stableQuery = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->window = pQueryInfo->window;
pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr));
*pQuery->pGroupbyExpr = pQueryInfo->groupbyExpr;
......@@ -3128,6 +3133,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) {
}
pQuery->colList[i] = pCol->info;
pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters);
}
}
......@@ -3139,34 +3145,48 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprInfo *pExpr = pField->pExpr;
SSqlExpr *pse = &pQuery->pExpr2[i].base;
SExprInfo* pExpr = pField->pExpr;
SSqlExpr* pse = &pQuery->pExpr2[i].base;
pse->uid = pTableMetaInfo->pTableMeta->id.uid;
// this should be switched to projection query
if (pExpr->pExpr == NULL) {
pse->numOfParams = 0; // no params for projection query
pse->functionId = TSDB_FUNC_PRJ;
pse->numOfParams = 0; // no params for projection query
pse->functionId = TSDB_FUNC_PRJ;
pse->colInfo.colId = pExpr->base.resColId;
pse->colInfo.flag = TSDB_COL_NORMAL;
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->resType = pExpr->base.resType;
pse->resBytes = pExpr->base.resBytes;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) {
pse->colInfo.colIndex = j;
}
}
pse->colInfo.flag = TSDB_COL_NORMAL;
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->resType = pExpr->base.resType;
pse->resBytes = pExpr->base.resBytes;
} else {
assert(pField->pExpr->pExpr != NULL);
pse->colInfo.colId = pExpr->base.colInfo.colId;
pse->colType = pExpr->base.colType;
pse->colType = pExpr->base.colType;
pse->colBytes = pExpr->base.colBytes;
pse->resBytes = sizeof(double);
pse->resType = TSDB_DATA_TYPE_DOUBLE;
pse->resType = TSDB_DATA_TYPE_DOUBLE;
pse->functionId = pExpr->base.functionId;
pse->functionId = pExpr->base.functionId;
pse->numOfParams = pExpr->base.numOfParams;
memset(pse->param, 0, sizeof(tVariant) * tListLen(pse->param));
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
}
}
pse->resColId = pExpr->base.resColId;
pse->uid = pTableMetaInfo->pTableMeta->id.uid;
}
} else {
pQuery->numOfExpr2 = 0;
}
}
......@@ -3195,7 +3215,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) {
// tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
// pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name);
return NULL;
return TSDB_CODE_SUCCESS;
}
SColumnInfo* pTagCol = &pQuery->tagColList[i];
......@@ -3208,20 +3228,20 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) {
}
}
pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->hasTagResults = hasTagValOutput(pQueryInfo);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQuery->stabledev = isStabledev(pQueryInfo);
pQuery->tsCompQuery = isTsCompQuery(pQueryInfo);
pQuery->simpleAgg = isSimpleAggregate(pQueryInfo);
pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo);
if (pQuery->fillType != TSDB_FILL_NONE) {
pQuery->fillVal = calloc(pQuery->numOfOutput, sizeof(int64_t));
memcpy(pQuery->fillVal, pQueryInfo->fillVal, pQuery->numOfOutput * sizeof(int64_t));
}
pQuery->srcRowSize = 0;
pQuery->maxSrcColumnSize = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->srcRowSize += pQuery->colList[i].bytes;
if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) {
pQuery->maxSrcColumnSize = pQuery->colList[i].bytes;
}
}
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
return pQuery;
return TSDB_CODE_SUCCESS;
}
......@@ -493,6 +493,8 @@ typedef struct {
int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length
int32_t numOfOperator;
int32_t tableScanOperator;// table scan operator. -1 means no scan operator
SColumnInfo colList[];
} SQueryTableMsg;
......
......@@ -350,6 +350,8 @@ typedef struct SQueryParam {
SColIndex *pGroupColIndex;
SColumnInfo *pTagColumnInfo;
SSqlGroupbyExpr *pGroupbyExpr;
int32_t tableScanOperator;
SArray *pOperator;
} SQueryParam;
typedef struct STableScanInfo {
......@@ -442,9 +444,10 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen, bool isSTable);
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
......@@ -1711,7 +1711,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) {
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator) {
qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo);
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -1749,6 +1749,85 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// group by normal column, sliding window query, interval query are handled by interval query processor
// interval (down sampling operation)
int32_t numOfOperator = taosArrayGetSize(pOperator);
for(int32_t i = 0; i < numOfOperator; ++i) {
int32_t* op = taosArrayGet(pOperator, i);
switch (*op) {
case OP_TagScan: {
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
break;
}
case OP_MultiTableTimeInterval: {
pRuntimeEnv->proot =
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_TimeWindow: {
pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_Groupby: {
pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_SessionWindow: {
pRuntimeEnv->proot =
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_MultiTableAggregate: {
pRuntimeEnv->proot =
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_Aggregate: {
pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_Arithmetic: {
SOperatorInfo* prev = pRuntimeEnv->pTableScanner;
if (i >= 1) {
prev = pRuntimeEnv->proot;
}
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break;
}
case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
break;
}
case OP_Offset: {
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
break;
}
case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput);
break;
}
default: {
assert(0);
}
}
}
/*
if (onlyQueryTags(pQuery)) { // do nothing for tags query
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
......@@ -1816,7 +1895,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
if (pQuery->limit.limit > 0) {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
*/
return TSDB_CODE_SUCCESS;
_clean:
......@@ -1966,29 +2045,6 @@ static bool isFirstLastRowQuery(SQuery *pQuery) {
return false;
}
static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
continue;
}
if ((functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_FIRST_DST) && !QUERY_IS_ASC_QUERY(pQuery)) {
return true;
}
if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) {
// the scan order to acquire the last result of the specified column
int32_t order = (int32_t)pQuery->pExpr1[i].base.param[0].i64;
if (order != pQuery->order.order) {
return true;
}
}
}
return false;
}
/**
* The following 4 kinds of query are treated as the tags query
* tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query
......@@ -3911,7 +3967,7 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
SExprInfo* pExprInfo = &pExpr[i];
pFillCol[i].col.bytes = pExprInfo->base.resBytes;
pFillCol[i].col.type = (int8_t)pExprInfo->base.resBytes;
pFillCol[i].col.type = (int8_t)pExprInfo->base.resType;
pFillCol[i].col.offset = offset;
pFillCol[i].tagIndex = -2;
pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query
......@@ -3924,42 +3980,36 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol;
}
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t vgId, bool isSTableQuery) {
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->tsdb = tsdb;
// pQuery->topBotQuery = isTopBottomQuery(pQuery);
// pQuery->hasTagResults = hasTagValOutput(pQuery);
// pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
// pQuery->stabledev = isStabledev(pQuery);
pRuntimeEnv->prevResult = prevResult;
pRuntimeEnv->qinfo = pQInfo;
if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
int32_t code = setupQueryHandle(tsdb, pQInfo, pQuery->stableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pQuery->tsdb = tsdb;
pQuery->vgId = vgId;
pQuery->stableQuery = isSTableQuery;
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (pQuery->stableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0);
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQuery, &pRuntimeEnv->resultInfo);
/*
if (onlyQueryTags(pQuery)) {
pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
// pRuntimeEnv->resultInfo.capacity = 4096;
// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (pQuery->queryBlockDist) {
pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
} else if (pQuery->tsCompQuery || pQuery->pointInterpQuery) {
......@@ -3969,7 +4019,29 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
} else {
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
}
*/
switch(tbScanner) {
case OP_TableBlockInfoScan: {
pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
break;
}
case OP_TableSeqScan: {
pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
break;
}
case OP_DataBlocksOptScan: {
pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
break;
}
case OP_TableScan: {
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
break;
}
default: {
// do nothing
break;
}
}
if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
......@@ -3987,7 +4059,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
// create runtime environment
int32_t numOfTables = (int32_t)pQuery->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -5446,7 +5518,6 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
}
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey);
pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval);
......@@ -5476,6 +5547,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen);
pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap);
pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId);
pQueryMsg->tableScanOperator = htonl(pQueryMsg->tableScanOperator);
pQueryMsg->numOfOperator = htonl(pQueryMsg->numOfOperator);
// query msg safety check
if (!validateQueryMsg(pQueryMsg)) {
......@@ -5536,6 +5609,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
}
}
param->tableScanOperator = pQueryMsg->tableScanOperator;
param->pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES);
if (param->pExpr == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -5727,6 +5801,14 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pMsg = (char *)pQueryMsg + pQueryMsg->tsBuf.tsOffset + pQueryMsg->tsBuf.tsLen;
}
param->pOperator = taosArrayInit(pQueryMsg->numOfOperator, sizeof(int32_t));
for(int32_t i = 0; i < pQueryMsg->numOfOperator; ++i) {
int32_t op = htonl(*(int32_t*)pMsg);
taosArrayPush(param->pOperator, &op);
pMsg += sizeof(int32_t);
}
param->sql = strndup(pMsg, pQueryMsg->sqlstrLen);
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList};
......@@ -6091,7 +6173,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
}
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId,
char* sql, uint64_t *qId) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
......@@ -6136,6 +6218,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQuery->simpleAgg = pQueryMsg->simpleAgg;
pQuery->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQuery->needReverseScan = pQueryMsg->needReverseScan;
pQuery->vgId = vgId;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) {
......@@ -6203,11 +6286,9 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
tsem_init(&pQInfo->ready, 0, 0);
pQuery->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
changeExecuteScanOrder(pQInfo, pQueryMsg, pQuery->stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
// bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow window = pQuery->window;
int32_t index = 0;
......@@ -6243,9 +6324,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
colIdCheck(pQuery, pQInfo);
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
......@@ -6289,8 +6367,7 @@ bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen, bool isSTable) {
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t prevResultLen) {
int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -6299,7 +6376,8 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQIn
STSBuf *pTsBuf = NULL;
if (pTsBufInfo->tsLen > 0) { // open new file to save the result
char *tsBlock = start + pTsBufInfo->tsOffset;
pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder, vgId);
pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder,
pQuery->vgId);
tsBufResetPos(pTsBuf);
bool ret = tsBufNextPos(pTsBuf);
......@@ -6332,7 +6410,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQIn
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -36,7 +36,7 @@ SQueryNode* queryPlanFromString() {
return NULL;
}
UNUSED_FUNC SArray* createTableScanPlan(SQuery* pQuery) {
SArray* createTableScanPlan(SQuery* pQuery) {
SArray* plan = taosArrayInit(4, sizeof(int32_t));
int32_t op = 0;
......@@ -56,7 +56,7 @@ UNUSED_FUNC SArray* createTableScanPlan(SQuery* pQuery) {
return plan;
}
UNUSED_FUNC SArray* createExecOperatorPlan(SQuery* pQuery) {
SArray* createExecOperatorPlan(SQuery* pQuery) {
SArray* plan = taosArrayInit(4, sizeof(int32_t));
int32_t op = 0;
......
......@@ -157,7 +157,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId);
assert(pQueryMsg->stableQuery == isSTableQuery);
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, vgId, param.sql, qId);
param.sql = NULL;
param.pExprs = NULL;
......@@ -170,7 +172,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
code = initQInfo(&pQueryMsg->tsBuf, tsdb, vgId, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, isSTableQuery);
code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen);
_over:
if (param.pGroupbyExpr != NULL) {
......
......@@ -414,6 +414,7 @@ if $rows != 1 then
endi
if $data00 != 0.204545455 then
print expect 0.204545455, actual: $data00
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册