提交 cdd7eed7 编写于 作者: H Haojun Liao

[td-2819] refactor codes.

上级 60010d2e
......@@ -300,7 +300,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery, void* addr);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
......
......@@ -216,7 +216,7 @@ typedef struct SQueryInfo {
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQuery* pQuery; // query object
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
......
......@@ -703,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
SQuery query = {0};
SQueryAttr query = {0};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
......@@ -2584,10 +2584,10 @@ int tscProcessShowCreateRsp(SSqlObj *pSql) {
int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
pQuery->qid = htobe64(pQuery->qid);
SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp;
pQueryAttr->qid = htobe64(pQueryAttr->qid);
pRes->qid = pQuery->qid;
pRes->qid = pQueryAttr->qid;
pRes->data = NULL;
tscResetForNextRetrieve(pRes);
......
......@@ -2018,7 +2018,6 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
}
pQueryInfo1->round = 1;
// tscDoQuery(pParent);
executeQuery(pParent, pQueryInfo1);
}
......@@ -3228,23 +3227,23 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
SQuery *pQuery = &pQInfo->query;
pQInfo->runtimeEnv.pQuery = pQuery;
tscCreateQueryFromQueryInfo(pQueryInfo, pQuery, addr);
SQueryAttr *pQueryAttr = &pQInfo->query;
pQInfo->runtimeEnv.pQueryAttr = pQueryAttr;
tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, addr);
// calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].base.resBytes > 0);
pQuery->resultRowSize += pExprs[col].base.resBytes;
pQueryAttr->resultRowSize += pExprs[col].base.resBytes;
// keep the tag length
if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) {
pQuery->tagLen += pExprs[col].base.resBytes;
pQueryAttr->tagLen += pExprs[col].base.resBytes;
}
}
// doUpdateExprColumnIndex(pQuery);
// int32_t ret = createFilterInfo(pQInfo, pQuery);
// doUpdateExprColumnIndex(pQueryAttr);
// int32_t ret = createFilterInfo(pQInfo, pQueryAttr);
// if (ret != TSDB_CODE_SUCCESS) {
// goto _cleanup;
// }
......@@ -3275,11 +3274,11 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STimeWindow window = pQuery->window;
STimeWindow window = pQueryAttr->window;
int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, i);
SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
size_t s = taosArrayGetSize(pa);
SArray* p1 = taosArrayInit(s, POINTER_BYTES);
......@@ -3294,7 +3293,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
window.skey = info->lastKey;
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, pQuery->groupbyColumn, window, buf);
STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf);
if (item == NULL) {
goto _cleanup;
}
......@@ -3308,7 +3307,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
}
}
// colIdCheck(pQuery, pQInfo);
// colIdCheck(pQueryAttr, pQInfo);
pQInfo->qId = 0;
if (qId != NULL) {
......
......@@ -3072,22 +3072,22 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
return p;
}
static int32_t createSecondaryExpr(SQuery* pQuery, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
if (!tscIsSecondStageQuery(pQueryInfo)) {
return TSDB_CODE_SUCCESS;
}
pQuery->numOfExpr2 = tscNumOfFields(pQueryInfo);
pQuery->pExpr2 = calloc(pQuery->numOfExpr2, sizeof(SExprInfo));
if (pQuery->pExpr2 == NULL) {
pQueryAttr->numOfExpr2 = tscNumOfFields(pQueryInfo);
pQueryAttr->pExpr2 = calloc(pQueryAttr->numOfExpr2, sizeof(SExprInfo));
if (pQueryAttr->pExpr2 == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
for (int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprInfo* pExpr = pField->pExpr;
SSqlExpr* pse = &pQuery->pExpr2[i].base;
SSqlExpr* pse = &pQueryAttr->pExpr2[i].base;
pse->uid = pTableMetaInfo->pTableMeta->id.uid;
pse->resColId = pExpr->base.resColId;
......@@ -3096,8 +3096,8 @@ static int32_t createSecondaryExpr(SQuery* pQuery, SQueryInfo* pQueryInfo, STabl
pse->functionId = TSDB_FUNC_PRJ;
pse->colInfo.colId = pExpr->base.resColId;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) {
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
if (pQueryAttr->pExpr1[j].base.resColId == pse->colInfo.colId) {
pse->colInfo.colIndex = j;
}
}
......@@ -3126,9 +3126,9 @@ static int32_t createSecondaryExpr(SQuery* pQuery, SQueryInfo* pQueryInfo, STabl
return TSDB_CODE_SUCCESS;
}
static int32_t createTagColumnInfo(SQuery* pQuery, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
pQuery->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
if (pQuery->numOfTags == 0) {
static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
pQueryAttr->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
if (pQueryAttr->numOfTags == 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -3136,13 +3136,13 @@ static int32_t createTagColumnInfo(SQuery* pQuery, SQueryInfo* pQueryInfo, STabl
int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
pQuery->tagColList = calloc(pQuery->numOfTags, sizeof(SColumnInfo));
if (pQuery->tagColList == NULL) {
pQueryAttr->tagColList = calloc(pQueryAttr->numOfTags, sizeof(SColumnInfo));
if (pQueryAttr->tagColList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t i = 0; i < pQuery->numOfTags; ++i) {
for (int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
SSchema* pColSchema = &pSchema[pCol->colIndex.columnIndex];
......@@ -3151,7 +3151,7 @@ static int32_t createTagColumnInfo(SQuery* pQuery, SQueryInfo* pQueryInfo, STabl
return TSDB_CODE_TSC_INVALID_SQL;
}
SColumnInfo* pTagCol = &pQuery->tagColList[i];
SColumnInfo* pTagCol = &pQueryAttr->tagColList[i];
pTagCol->colId = pColSchema->colId;
pTagCol->bytes = pColSchema->bytes;
......@@ -3162,98 +3162,98 @@ static int32_t createTagColumnInfo(SQuery* pQuery, SQueryInfo* pQueryInfo, STabl
return TSDB_CODE_SUCCESS;
}
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery, void* addr) {
memset(pQuery, 0, sizeof(SQuery));
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr) {
memset(pQueryAttr, 0, sizeof(SQueryAttr));
int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
int16_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pQuery->hasTagResults = hasTagValOutput(pQueryInfo);
pQuery->stabledev = isStabledev(pQueryInfo);
pQuery->tsCompQuery = isTsCompQuery(pQueryInfo);
pQuery->simpleAgg = isSimpleAggregate(pQueryInfo);
pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput;
pQuery->limit = pQueryInfo->limit;
pQuery->order = pQueryInfo->order;
pQuery->fillType = pQueryInfo->fillType;
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->window = pQueryInfo->window;
memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval));
pQueryAttr->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo);
pQueryAttr->stabledev = isStabledev(pQueryInfo);
pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo);
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
pQueryAttr->limit = pQueryInfo->limit;
pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->window = pQueryInfo->window;
memcpy(&pQueryAttr->interval, &pQueryInfo->interval, sizeof(pQueryAttr->interval));
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr));
*(pQuery->pGroupbyExpr) = pQueryInfo->groupbyExpr;
pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr));
*(pQueryAttr->pGroupbyExpr) = pQueryInfo->groupbyExpr;
pQuery->pExpr1 = calloc(pQuery->numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pQueryAttr->pExpr1 = calloc(pQueryAttr->numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i);
tscSqlExprAssign(&pQuery->pExpr1[i], pExpr);
tscSqlExprAssign(&pQueryAttr->pExpr1[i], pExpr);
}
pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo));
pQueryAttr->colList = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) {
assert(0);
}
pQuery->colList[i] = pCol->info;
pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters);
pQueryAttr->colList[i] = pCol->info;
pQueryAttr->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->colList[i].numOfFilters);
}
// for simple table, not for super table
int32_t code = createSecondaryExpr(pQuery, pQueryInfo, pTableMetaInfo);
int32_t code = createSecondaryExpr(pQueryAttr, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// tag column info
code = createTagColumnInfo(pQuery, pQueryInfo, pTableMetaInfo);
code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pQuery->fillType != TSDB_FILL_NONE) {
pQuery->fillVal = calloc(pQuery->numOfOutput, sizeof(int64_t));
memcpy(pQuery->fillVal, pQueryInfo->fillVal, pQuery->numOfOutput * sizeof(int64_t));
if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t));
memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryAttr->numOfOutput * sizeof(int64_t));
}
pQuery->srcRowSize = 0;
pQuery->maxTableColumnWidth = 0;
pQueryAttr->srcRowSize = 0;
pQueryAttr->maxTableColumnWidth = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->srcRowSize += pQuery->colList[i].bytes;
if (pQuery->maxTableColumnWidth < pQuery->colList[i].bytes) {
pQuery->maxTableColumnWidth = pQuery->colList[i].bytes;
pQueryAttr->srcRowSize += pQueryAttr->colList[i].bytes;
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->colList[i].bytes) {
pQueryAttr->maxTableColumnWidth = pQueryAttr->colList[i].bytes;
}
}
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
if (pQuery->numOfCols <= 0 && !tscQueryTags(pQueryInfo) && !pQuery->queryBlockDist) {
if (pQueryAttr->numOfCols <= 0 && !tscQueryTags(pQueryInfo) && !pQueryAttr->queryBlockDist) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", addr,
(uint64_t)pQuery->numOfCols, numOfCols);
(uint64_t)pQueryAttr->numOfCols, numOfCols);
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQuery->interval.interval < 0) {
if (pQueryAttr->interval.interval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, addr,
(int64_t)pQueryInfo->interval.interval);
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQuery->pGroupbyExpr->numOfGroupCols < 0) {
if (pQueryAttr->pGroupbyExpr->numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", addr, pQueryInfo->groupbyExpr.numOfGroupCols);
return TSDB_CODE_TSC_INVALID_SQL;
}
......
......@@ -18,7 +18,7 @@ TEST(testCase, parse_time) {
deltaToUtcInitOnce();
// window: 1500000001000, 1500002000000
// pQuery->interval: interval: 86400000, sliding:3600000
// pQueryAttr->interval: interval: 86400000, sliding:3600000
int64_t key = 1500000001000;
SInterval interval = {0};
interval.interval = 86400000;
......
......@@ -179,7 +179,7 @@ typedef struct SSDataBlock {
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef struct SQuery {
typedef struct SQueryAttr {
SLimitVal limit;
bool stableQuery; // super table query or not
......@@ -226,7 +226,7 @@ typedef struct SQuery {
SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
} SQuery;
} SQueryAttr;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
......@@ -235,7 +235,7 @@ struct SOperatorInfo;
typedef struct SQueryRuntimeEnv {
jmp_buf env;
SQuery* pQuery;
SQueryAttr* pQueryAttr;
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
......@@ -324,7 +324,7 @@ typedef struct SQInfo {
int64_t owner; // if it is in execution
SQueryRuntimeEnv runtimeEnv;
SQuery query;
SQueryAttr query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
......@@ -451,14 +451,14 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo *createTableQueryInfo(SQuery* pQuery, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(SQuery* pQuery);
bool onlyQueryTags(SQueryAttr* pQueryAttr);
bool isValidQInfo(void *param);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QPLAN_H
#define TDENGINE_QPLAN_H
//TODO refactor
SArray* createTableScanPlan(SQueryAttr* pQueryAttr);
SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr);
#endif // TDENGINE_QPLAN_H
......@@ -29,7 +29,7 @@
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1)
int32_t getOutputInterResultBufSize(SQuery* pQuery);
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr);
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
......@@ -51,10 +51,10 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int
return pResultRowInfo->pResult[slot];
}
static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) {
assert(rowOffset >= 0 && pQuery != NULL);
static FORCE_INLINE char *getPosInResultPage(SQueryAttr *pQueryAttr, tFilePage* page, int32_t rowOffset, int16_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
......
......@@ -3318,8 +3318,6 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
INC_INIT_VAL(pCtx, 1);
char *pData = GET_INPUT_DATA(pCtx, index);
memcpy(pCtx->pOutput, pData, pCtx->inputBytes);
pCtx->pOutput += pCtx->inputBytes;
}
/**
......
此差异已折叠。
......@@ -36,17 +36,17 @@ SQueryNode* queryPlanFromString() {
return NULL;
}
SArray* createTableScanPlan(SQuery* pQuery) {
SArray* createTableScanPlan(SQueryAttr* pQueryAttr) {
SArray* plan = taosArrayInit(4, sizeof(int32_t));
int32_t op = 0;
if (onlyQueryTags(pQuery)) {
if (onlyQueryTags(pQueryAttr)) {
// op = OP_TagScan;
} else if (pQuery->queryBlockDist) {
} else if (pQueryAttr->queryBlockDist) {
op = OP_TableBlockInfoScan;
} else if (pQuery->tsCompQuery || pQuery->pointInterpQuery) {
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
op = OP_TableSeqScan;
} else if (pQuery->needReverseScan) {
} else if (pQueryAttr->needReverseScan) {
op = OP_DataBlocksOptScan;
} else {
op = OP_TableScan;
......@@ -56,50 +56,50 @@ SArray* createTableScanPlan(SQuery* pQuery) {
return plan;
}
SArray* createExecOperatorPlan(SQuery* pQuery) {
SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
SArray* plan = taosArrayInit(4, sizeof(int32_t));
int32_t op = 0;
if (onlyQueryTags(pQuery)) { // do nothing for tags query
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
op = OP_TagScan;
taosArrayPush(plan, &op);
} else if (pQuery->interval.interval > 0) {
if (pQuery->stableQuery) {
} else if (pQueryAttr->interval.interval > 0) {
if (pQueryAttr->stableQuery) {
op = OP_MultiTableTimeInterval;
taosArrayPush(plan, &op);
} else {
op = OP_TimeWindow;
taosArrayPush(plan, &op);
if (pQuery->pExpr2 != NULL) {
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
if (pQuery->fillType != TSDB_FILL_NONE && (!pQuery->pointInterpQuery)) {
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) {
op = OP_Fill;
taosArrayPush(plan, &op);
}
}
} else if (pQuery->groupbyColumn) {
} else if (pQueryAttr->groupbyColumn) {
op = OP_Groupby;
taosArrayPush(plan, &op);
if (pQuery->pExpr2 != NULL) {
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
} else if (pQuery->sw.gap > 0) {
} else if (pQueryAttr->sw.gap > 0) {
op = OP_SessionWindow;
taosArrayPush(plan, &op);
if (pQuery->pExpr2 != NULL) {
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
} else if (pQuery->simpleAgg) {
if (pQuery->stableQuery && !pQuery->tsCompQuery) {
} else if (pQueryAttr->simpleAgg) {
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) {
op = OP_MultiTableAggregate;
} else {
op = OP_Aggregate;
......@@ -107,7 +107,7 @@ SArray* createExecOperatorPlan(SQuery* pQuery) {
taosArrayPush(plan, &op);
if (pQuery->pExpr2 != NULL && !pQuery->stableQuery) {
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
......@@ -116,12 +116,12 @@ SArray* createExecOperatorPlan(SQuery* pQuery) {
taosArrayPush(plan, &op);
}
if (pQuery->limit.offset > 0) {
if (pQueryAttr->limit.offset > 0) {
op = OP_Offset;
taosArrayPush(plan, &op);
}
if (pQuery->limit.limit > 0) {
if (pQueryAttr->limit.limit > 0) {
op = OP_Limit;
taosArrayPush(plan, &op);
}
......
......@@ -30,11 +30,11 @@ typedef struct SCompSupporter {
int32_t order;
} SCompSupporter;
int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) {
int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
size += pQuery->pExpr1[i].base.interBytes;
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
size += pQueryAttr->pExpr1[i].base.interBytes;
}
assert(size >= 0);
......@@ -136,11 +136,11 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
int16_t size = pRuntimeEnv->pQuery->pExpr1[i].base.resType;
char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset);
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resType;
char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
memset(s, 0, size);
offset += size;
......@@ -167,8 +167,8 @@ SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t
}
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
return (pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pQuery->interBufSize + sizeof(SResultRow);
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
return (pQueryAttr->numOfOutput * sizeof(SResultRowCellInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
}
SResultRowPool* initResultRowPool(size_t size) {
......@@ -385,10 +385,10 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
}
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pExpr1[j].base.functionId;
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
int32_t functionId = pQueryAttr->pExpr1[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
......@@ -451,7 +451,7 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -487,7 +487,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
goto _end;
}
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQuery->order.order};
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order};
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
......
......@@ -273,14 +273,14 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
code = pQInfo->code;
} else {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize,
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQueryAttr->resultRowSize,
GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
} else {
*buildRes = false;
......@@ -303,11 +303,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = pQuery->resultRowSize * s;
size_t size = pQueryAttr->resultRowSize * s;
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
......@@ -329,7 +329,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime);
}
(*pRsp)->precision = htons(pQuery->precision);
(*pRsp)->precision = htons(pQueryAttr->precision);
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data);
} else {
......
......@@ -123,7 +123,7 @@ typedef struct STsdbQueryHandle {
SMemRef *pMemRef;
SArray *defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
SArray *prev; // previous row which is before than time window
SArray *next; // next row which is after the query time window
......
run general/parser/alter.sim
run general/parser/alter1.sim
run general/parser/alter_stable.sim
run general/parser/auto_create_tb.sim
run general/parser/auto_create_tb_drop_tb.sim
run general/parser/col_arithmetic_operation.sim
run general/parser/columnValue.sim
run general/parser/commit.sim
run general/parser/create_db.sim
run general/parser/create_mt.sim
run general/parser/create_tb.sim
run general/parser/dbtbnameValidate.sim
run general/parser/fill.sim
run general/parser/fill_stb.sim
#run general/parser/fill_us.sim #
run general/parser/first_last.sim
run general/parser/import_commit1.sim
run general/parser/import_commit2.sim
run general/parser/import_commit3.sim
#run general/parser/import_file.sim
run general/parser/insert_tb.sim
run general/parser/tags_dynamically_specifiy.sim
run general/parser/interp.sim
run general/parser/lastrow.sim
run general/parser/limit.sim
run general/parser/limit1.sim
run general/parser/limit1_tblocks100.sim
run general/parser/limit2.sim
run general/parser/mixed_blocks.sim
run general/parser/nchar.sim
run general/parser/null_char.sim
run general/parser/selectResNum.sim
run general/parser/select_across_vnodes.sim
run general/parser/select_from_cache_disk.sim
run general/parser/set_tag_vals.sim
run general/parser/single_row_in_tb.sim
run general/parser/slimit.sim
run general/parser/slimit1.sim
run general/parser/slimit_alter_tags.sim
run general/parser/tbnameIn.sim
run general/parser/slimit_alter_tags.sim # persistent failed
run general/parser/join.sim
run general/parser/join_multivnode.sim
run general/parser/join_manyblocks.sim
#run general/parser/alter.sim
#run general/parser/alter1.sim
#run general/parser/alter_stable.sim
#run general/parser/auto_create_tb.sim
#run general/parser/auto_create_tb_drop_tb.sim
#run general/parser/col_arithmetic_operation.sim
#run general/parser/columnValue.sim
#run general/parser/commit.sim
#run general/parser/create_db.sim
#run general/parser/create_mt.sim
#run general/parser/create_tb.sim
#run general/parser/dbtbnameValidate.sim
#run general/parser/fill.sim
#run general/parser/fill_stb.sim
##run general/parser/fill_us.sim #
#run general/parser/first_last.sim
#run general/parser/import_commit1.sim
#run general/parser/import_commit2.sim
#run general/parser/import_commit3.sim
##run general/parser/import_file.sim
#run general/parser/insert_tb.sim
#run general/parser/tags_dynamically_specifiy.sim
#run general/parser/interp.sim
#run general/parser/lastrow.sim
#run general/parser/limit.sim
#run general/parser/limit1.sim
#run general/parser/limit1_tblocks100.sim
#run general/parser/limit2.sim
#run general/parser/mixed_blocks.sim
#run general/parser/nchar.sim
#run general/parser/null_char.sim
#run general/parser/selectResNum.sim
#run general/parser/select_across_vnodes.sim
#run general/parser/select_from_cache_disk.sim
#run general/parser/set_tag_vals.sim
#run general/parser/single_row_in_tb.sim
#run general/parser/slimit.sim
#run general/parser/slimit1.sim
#run general/parser/slimit_alter_tags.sim
#run general/parser/tbnameIn.sim
#run general/parser/slimit_alter_tags.sim # persistent failed
#run general/parser/join.sim
#run general/parser/join_multivnode.sim
#run general/parser/join_manyblocks.sim
run general/parser/projection_limit_offset.sim
run general/parser/select_with_tags.sim
run general/parser/groupby.sim
#run general/parser/groupby.sim
run general/parser/tags_filter.sim
run general/parser/topbot.sim
run general/parser/union.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册