提交 60010d2e 编写于 作者: H Haojun Liao

[td-2819] refactor codes.

上级 9663eea7
...@@ -127,10 +127,10 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); ...@@ -127,10 +127,10 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo); bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo); bool hasTagValOutput(SQueryInfo* pQueryInfo);
bool timeWindowInterpoRequired(SQueryInfo *pQueryNodeInfo); bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
bool isStabledev(SQueryInfo* pQueryInfo); bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryNodeInfo); bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryNodeInfo); bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo); bool isBlockDistQuery(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
...@@ -300,11 +300,11 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); ...@@ -300,11 +300,11 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize(); uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name); int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta); STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryNodeInfo, SQuery* pQuery); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema); void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
uint64_t* qId, char* sql); uint64_t* qId, char* sql, void* addr);
void* malloc_throw(size_t size); void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size); void* calloc_throw(size_t nmemb, size_t size);
......
...@@ -6642,10 +6642,6 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) { ...@@ -6642,10 +6642,6 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) {
return meta; return meta;
} }
//static SColumnInfo* getColumnInfoFromSchema(SQueryInfo* pUpstream) {
//
//}
int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) {
assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0));
......
...@@ -426,15 +426,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -426,15 +426,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(*pSql->fp)(pSql->param, pSql, rpcMsg->code); (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
} }
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
taosRemoveRef(tscObjRef, handle); taosRemoveRef(tscObjRef, handle);
tscDebug("%p sqlObj is automatically freed", pSql); tscDebug("%p sqlObj is automatically freed", pSql);
} }
taosReleaseRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
} }
...@@ -707,35 +704,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -707,35 +704,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
SQuery query = {0}; SQuery query = {0};
tscCreateQueryFromQueryInfo(pQueryInfo, &query); tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
SArray* tableScanOperator = createTableScanPlan(&query); SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
/*
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
tscGetNumOfColumns(pTableMeta));
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQueryInfo->interval.interval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
return TSDB_CODE_TSC_INVALID_SQL;
}
*/
{ {
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
...@@ -764,17 +740,29 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -764,17 +740,29 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->offset = htobe64(query.limit.offset); pQueryMsg->offset = htobe64(query.limit.offset);
pQueryMsg->numOfCols = htons(query.numOfCols); pQueryMsg->numOfCols = htons(query.numOfCols);
pQueryMsg->interval.interval = htobe64(query.interval.interval); pQueryMsg->interval.interval = htobe64(query.interval.interval);
pQueryMsg->interval.sliding = htobe64(query.interval.sliding); pQueryMsg->interval.sliding = htobe64(query.interval.sliding);
pQueryMsg->interval.offset = htobe64(query.interval.offset); pQueryMsg->interval.offset = htobe64(query.interval.offset);
pQueryMsg->interval.intervalUnit = query.interval.intervalUnit; pQueryMsg->interval.intervalUnit = query.interval.intervalUnit;
pQueryMsg->interval.slidingUnit = query.interval.slidingUnit; pQueryMsg->interval.slidingUnit = query.interval.slidingUnit;
pQueryMsg->interval.offsetUnit = query.interval.offsetUnit; pQueryMsg->interval.offsetUnit = query.interval.offsetUnit;
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->stableQuery = query.stableQuery;
pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->topBotQuery = query.topBotQuery;
pQueryMsg->sw.gap = htobe64(query.sw.gap); pQueryMsg->groupbyColumn = query.groupbyColumn;
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); pQueryMsg->hasTagResults = query.hasTagResults;
pQueryMsg->timeWindowInterpo = query.timeWindowInterpo;
pQueryMsg->queryBlockDist = query.queryBlockDist;
pQueryMsg->stabledev = query.stabledev;
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->sw.gap = htobe64(query.sw.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); pQueryMsg->secondStageOutput = htonl(query.numOfExpr2);
pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number
...@@ -825,20 +813,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -825,20 +813,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
} }
{
pQueryMsg->stableQuery = query.stableQuery;
pQueryMsg->topBotQuery = query.topBotQuery;
pQueryMsg->groupbyColumn = query.groupbyColumn;
pQueryMsg->hasTagResults = query.hasTagResults;
pQueryMsg->timeWindowInterpo = query.timeWindowInterpo;
pQueryMsg->queryBlockDist = query.queryBlockDist;
pQueryMsg->stabledev = query.stabledev;
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
}
SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg;
for (int32_t i = 0; i < query.numOfOutput; ++i) { for (int32_t i = 0; i < query.numOfOutput; ++i) {
......
...@@ -487,11 +487,13 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -487,11 +487,13 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
if (pSub->pSql->self != 0) { if (pSub->pSql->self != 0) {
taosReleaseRef(tscObjRef, pSub->pSql->self); taosReleaseRef(tscObjRef, pSub->pSql->self);
} else { } else {
tscFreeSqlObj(pSub->pSql); tscFreeSqlObj(pSub->pSql);
} }
pSub->pSql = pSql; pSub->pSql = pSql;
pSql->pSubscription = pSub; pSql->pSubscription = pSub;
} }
......
...@@ -3215,10 +3215,10 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -3215,10 +3215,10 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData; return hasData;
} }
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
uint64_t* qId, char* sql) { uint64_t* qId, char* sql, void* addr) {
assert(pQueryNodeInfo != NULL); assert(pQueryInfo != NULL);
int16_t numOfOutput = pQueryNodeInfo->fieldsInfo.numOfOutput; int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
...@@ -3227,10 +3227,10 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs ...@@ -3227,10 +3227,10 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo; pQInfo->signature = pQInfo;
SQuery *pQuery = &pQInfo->query;
tscCreateQueryFromQueryInfo(pQueryNodeInfo, pQuery); SQuery *pQuery = &pQInfo->query;
pQInfo->runtimeEnv.pQuery = pQuery; pQInfo->runtimeEnv.pQuery = pQuery;
tscCreateQueryFromQueryInfo(pQueryInfo, pQuery, addr);
// calculate the result row size // calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) { for (int16_t col = 0; col < numOfOutput; ++col) {
......
...@@ -104,7 +104,6 @@ bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) { ...@@ -104,7 +104,6 @@ bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->base.functionId; int32_t functId = pExpr->base.functionId;
// "select count(tbname)" query
if (functId == TSDB_FUNC_BLKINFO) { if (functId == TSDB_FUNC_BLKINFO) {
return true; return true;
} }
...@@ -568,7 +567,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -568,7 +567,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
taosArrayPush(tableGroupInfo.pGroupList, &group); taosArrayPush(tableGroupInfo.pGroupList, &group);
SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, 0, NULL); SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, 0, NULL, NULL);
printf("%p\n", pQInfo); printf("%p\n", pQInfo);
} }
} }
...@@ -3073,159 +3072,156 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { ...@@ -3073,159 +3072,156 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
return p; return p;
} }
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) { static int32_t createSecondaryExpr(SQuery* pQuery, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
memset(pQuery, 0, sizeof(SQuery)); if (!tscIsSecondStageQuery(pQueryInfo)) {
return TSDB_CODE_SUCCESS;
}
pQuery->tsdb = NULL; pQuery->numOfExpr2 = tscNumOfFields(pQueryInfo);
pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo); pQuery->pExpr2 = calloc(pQuery->numOfExpr2, sizeof(SExprInfo));
pQuery->hasTagResults = hasTagValOutput(pQueryInfo); if (pQuery->pExpr2 == NULL) {
pQuery->stabledev = isStabledev(pQueryInfo); return TSDB_CODE_TSC_OUT_OF_MEMORY;
pQuery->tsCompQuery = isTsCompQuery(pQueryInfo); }
pQuery->simpleAgg = isSimpleAggregate(pQueryInfo);
pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList); for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprInfo* pExpr = pField->pExpr;
pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput;
pQuery->limit = pQueryInfo->limit;
pQuery->order = pQueryInfo->order;
pQuery->pExpr1 = NULL;
pQuery->pExpr2 = NULL; // not support yet.
pQuery->numOfExpr2 = 0;
pQuery->pGroupbyExpr = NULL;
pQuery->fillType = pQueryInfo->fillType;
pQuery->numOfTags = 0;
pQuery->tagColList = NULL;
memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval)); SSqlExpr* pse = &pQuery->pExpr2[i].base;
pse->uid = pTableMetaInfo->pTableMeta->id.uid;
pse->resColId = pExpr->base.resColId;
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; if (pExpr->pExpr == NULL) { // this should be switched to projection query
pse->numOfParams = 0; // no params for projection query
pse->functionId = TSDB_FUNC_PRJ;
pse->colInfo.colId = pExpr->base.resColId;
pQuery->vgId = 0; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
pQuery->stableQuery = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) {
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); pse->colInfo.colIndex = j;
pQuery->window = pQueryInfo->window; }
}
pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); pse->colInfo.flag = TSDB_COL_NORMAL;
*pQuery->pGroupbyExpr = pQueryInfo->groupbyExpr; pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->resType = pExpr->base.resType;
pse->resBytes = pExpr->base.resBytes;
} else { // arithmetic expression
pse->colInfo.colId = pExpr->base.colInfo.colId;
pse->colType = pExpr->base.colType;
pse->colBytes = pExpr->base.colBytes;
pse->resBytes = sizeof(double);
pse->resType = TSDB_DATA_TYPE_DOUBLE;
pse->functionId = pExpr->base.functionId;
pse->numOfParams = pExpr->base.numOfParams;
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
}
}
}
{ return TSDB_CODE_SUCCESS;
pQuery->numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); }
pQuery->pExpr1 = calloc(pQuery->numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { static int32_t createTagColumnInfo(SQuery* pQuery, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); pQuery->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
tscSqlExprAssign(&pQuery->pExpr1[i], pExpr); if (pQuery->numOfTags == 0) {
} return TSDB_CODE_SUCCESS;
}
pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo)); STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) {
assert(0);
}
pQuery->colList[i] = pCol->info; int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters);
} pQuery->tagColList = calloc(pQuery->numOfTags, sizeof(SColumnInfo));
if (pQuery->tagColList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
{// for simple table, not for super table SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
if (tscIsSecondStageQuery(pQueryInfo)) { for (int32_t i = 0; i < pQuery->numOfTags; ++i) {
pQuery->numOfExpr2 = tscNumOfFields(pQueryInfo); SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
pQuery->pExpr2 = calloc(pQuery->numOfExpr2, sizeof(SExprInfo)); SSchema* pColSchema = &pSchema[pCol->colIndex.columnIndex];
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < TSDB_TBNAME_COLUMN_INDEX) ||
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); (!isValidDataType(pColSchema->type))) {
return TSDB_CODE_TSC_INVALID_SQL;
}
SExprInfo* pExpr = pField->pExpr; SColumnInfo* pTagCol = &pQuery->tagColList[i];
SSqlExpr* pse = &pQuery->pExpr2[i].base; pTagCol->colId = pColSchema->colId;
pse->uid = pTableMetaInfo->pTableMeta->id.uid; pTagCol->bytes = pColSchema->bytes;
pTagCol->type = pColSchema->type;
pTagCol->numOfFilters = 0;
}
// this should be switched to projection query return TSDB_CODE_SUCCESS;
if (pExpr->pExpr == NULL) { }
pse->numOfParams = 0; // no params for projection query
pse->functionId = TSDB_FUNC_PRJ;
pse->colInfo.colId = pExpr->base.resColId;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery, void* addr) {
if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) { memset(pQuery, 0, sizeof(SQuery));
pse->colInfo.colIndex = j;
}
}
pse->colInfo.flag = TSDB_COL_NORMAL;
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->resType = pExpr->base.resType;
pse->resBytes = pExpr->base.resBytes;
} else {
assert(pField->pExpr->pExpr != NULL);
pse->colInfo.colId = pExpr->base.colInfo.colId; int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
pse->colType = pExpr->base.colType; int16_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pse->colBytes = pExpr->base.colBytes;
pse->resBytes = sizeof(double); pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pse->resType = TSDB_DATA_TYPE_DOUBLE; pQuery->hasTagResults = hasTagValOutput(pQueryInfo);
pQuery->stabledev = isStabledev(pQueryInfo);
pQuery->tsCompQuery = isTsCompQuery(pQueryInfo);
pQuery->simpleAgg = isSimpleAggregate(pQueryInfo);
pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput;
pQuery->limit = pQueryInfo->limit;
pQuery->order = pQueryInfo->order;
pQuery->fillType = pQueryInfo->fillType;
pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQuery->window = pQueryInfo->window;
pse->functionId = pExpr->base.functionId; memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval));
pse->numOfParams = pExpr->base.numOfParams;
memset(pse->param, 0, sizeof(tVariant) * tListLen(pse->param)); STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
}
}
pse->resColId = pExpr->base.resColId; pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr));
pse->uid = pTableMetaInfo->pTableMeta->id.uid; *(pQuery->pGroupbyExpr) = pQueryInfo->groupbyExpr;
}
} pQuery->pExpr1 = calloc(pQuery->numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i);
tscSqlExprAssign(&pQuery->pExpr1[i], pExpr);
} }
// tag column info pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo));
{ for(int32_t i = 0; i < numOfCols; ++i) {
pQuery->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (pQuery->numOfTags > 0) { // todo index problem if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) {
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; assert(0);
}
// int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
// int32_t total = numOfTagColumns + numOfColumns;
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
pQuery->tagColList = calloc(pQuery->numOfTags, sizeof(SColumnInfo));
pQuery->numOfTags = pQuery->numOfTags;
for (int32_t i = 0; i < pQuery->numOfTags; ++i) {
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
(!isValidDataType(pColSchema->type))) {
char n[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, n);
// tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
// pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name);
return TSDB_CODE_SUCCESS;
}
SColumnInfo* pTagCol = &pQuery->tagColList[i]; pQuery->colList[i] = pCol->info;
pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters);
}
pTagCol->colId = pColSchema->colId; // for simple table, not for super table
pTagCol->bytes = pColSchema->bytes; int32_t code = createSecondaryExpr(pQuery, pQueryInfo, pTableMetaInfo);
pTagCol->type = pColSchema->type; if (code != TSDB_CODE_SUCCESS) {
pTagCol->numOfFilters = 0; return code;
} }
}
// tag column info
code = createTagColumnInfo(pQuery, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
} }
if (pQuery->fillType != TSDB_FILL_NONE) { if (pQuery->fillType != TSDB_FILL_NONE) {
...@@ -3234,14 +3230,33 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) { ...@@ -3234,14 +3230,33 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) {
} }
pQuery->srcRowSize = 0; pQuery->srcRowSize = 0;
pQuery->maxSrcColumnSize = 0; pQuery->maxTableColumnWidth = 0;
for (int16_t i = 0; i < numOfCols; ++i) { for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->srcRowSize += pQuery->colList[i].bytes; pQuery->srcRowSize += pQuery->colList[i].bytes;
if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) { if (pQuery->maxTableColumnWidth < pQuery->colList[i].bytes) {
pQuery->maxSrcColumnSize = pQuery->colList[i].bytes; pQuery->maxTableColumnWidth = pQuery->colList[i].bytes;
} }
} }
pQuery->interBufSize = getOutputInterResultBufSize(pQuery); pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
if (pQuery->numOfCols <= 0 && !tscQueryTags(pQueryInfo) && !pQuery->queryBlockDist) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", addr,
(uint64_t)pQuery->numOfCols, numOfCols);
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQuery->interval.interval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, addr,
(int64_t)pQueryInfo->interval.interval);
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQuery->pGroupbyExpr->numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", addr, pQueryInfo->groupbyExpr.numOfGroupCols);
return TSDB_CODE_TSC_INVALID_SQL;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -209,7 +209,7 @@ typedef struct SQuery { ...@@ -209,7 +209,7 @@ typedef struct SQuery {
int32_t srcRowSize; // todo extract struct int32_t srcRowSize; // todo extract struct
int32_t resultRowSize; int32_t resultRowSize;
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxSrcColumnSize; int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query int32_t tagLen; // tag value length of current query
SSqlGroupbyExpr* pGroupbyExpr; SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1; SExprInfo* pExpr1;
...@@ -222,7 +222,6 @@ typedef struct SQuery { ...@@ -222,7 +222,6 @@ typedef struct SQuery {
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
STableQueryInfo* current;
void* tsdb; void* tsdb;
SMemRef memRef; SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
...@@ -263,6 +262,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -263,6 +262,7 @@ typedef struct SQueryRuntimeEnv {
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
STableQueryInfo *current;
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap; SHashObj *pTableRetrieveTsMap;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
......
...@@ -275,37 +275,6 @@ static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -275,37 +275,6 @@ static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
} }
} }
//static bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
// if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
// return false;
// }
//
// for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
// SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
// if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
// //make sure the normal column locates at the second position if tbname exists in group by clause
// if (pGroupbyExpr->numOfGroupCols > 1) {
// assert(pColIndex->colIndex > 0);
// }
//
// return true;
// }
// }
//
// return false;
//}
//static UNUSED_FUNC bool isStabledev(SQuery* pQuery) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pExpr1[i].base.functionId;
// if (functId == TSDB_FUNC_STDDEV_DST) {
// return true;
// }
// }
//
// return false;
//}
static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) { static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
bool hasTags = false; bool hasTags = false;
int32_t numOfSelectivity = 0; int32_t numOfSelectivity = 0;
...@@ -336,49 +305,6 @@ static bool isProjQuery(SQuery *pQuery) { ...@@ -336,49 +305,6 @@ static bool isProjQuery(SQuery *pQuery) {
return true; return true;
} }
//static bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; }
//static UNUSED_FUNC bool isTopBottomQuery(SQuery *pQuery) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functionId = pQuery->pExpr1[i].base.functionId;
// if (functionId == TSDB_FUNC_TS) {
// continue;
// }
//
// if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
// return true;
// }
// }
//
// return false;
//}
//static UNUSED_FUNC bool timeWindowInterpoRequired(SQuery *pQuery) {
// for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functionId = pQuery->pExpr1[i].base.functionId;
// if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) {
// return true;
// }
// }
//
// return false;
//}
//static UNUSED_FUNC bool hasTagValOutput(SQuery* pQuery) {
// SExprInfo *pExprInfo = &pQuery->pExpr1[0];
// if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
// return true;
// } else { // set tag value, by which the results are aggregated.
// for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
// SExprInfo *pLocalExprInfo = &pQuery->pExpr1[idx];
//
// // ts_comp column required the tag value for join filter
// if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
// return true;
// }
// }
// }
//
// return false;
//}
static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) { static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) {
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return false; return false;
...@@ -736,16 +662,16 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer ...@@ -736,16 +662,16 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer
} }
} }
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows); assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
SQuery* pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pRuntimeEnv->current;
int32_t num = -1; int32_t num = -1;
int32_t order = pQuery->order.order; int32_t order = pQuery->order.order;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
STableQueryInfo* item = pQuery->current;
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
if (ekey < pDataBlockInfo->window.ekey) { if (ekey < pDataBlockInfo->window.ekey) {
num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
...@@ -1246,7 +1172,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1246,7 +1172,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t forwardStep = 0; int32_t forwardStep = 0;
TSKEY ekey = reviseWindowEkey(pQuery, &win); TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = forwardStep =
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// prev time window not interpolation yet. // prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pResultRowInfo); int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
...@@ -1305,7 +1231,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1305,7 +1231,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
ekey = reviseWindowEkey(pQuery, &nextWin); ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
...@@ -1317,12 +1243,12 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1317,12 +1243,12 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
} }
updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey); updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pRuntimeEnv->current->lastKey);
} }
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->pQuery->current; STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
...@@ -1366,7 +1292,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1366,7 +1292,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->pQuery->current; STableQueryInfo* item = pRuntimeEnv->current;
// primary timestamp column // primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
...@@ -1719,7 +1645,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1719,7 +1645,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); pRuntimeEnv->keyBuf = malloc(pQuery->maxTableColumnWidth + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQuery->tagLen); pRuntimeEnv->tagVal = malloc(pQuery->tagLen);
...@@ -1797,12 +1723,14 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1797,12 +1723,14 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Arithmetic: { case OP_Arithmetic: {
SOperatorInfo* prev = pRuntimeEnv->pTableScanner; SOperatorInfo* prev = pRuntimeEnv->pTableScanner;
if (i >= 1) { if (i == 0) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
} else {
prev = pRuntimeEnv->proot; prev = pRuntimeEnv->proot;
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr2, pQuery->numOfExpr2);
} }
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
break; break;
} }
...@@ -2421,7 +2349,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf ...@@ -2421,7 +2349,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
} }
// save the cursor status // save the cursor status
pRuntimeEnv->pQuery->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else { } else {
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false; bool qualified = false;
...@@ -2566,7 +2494,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2566,7 +2494,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pBlock->pBlockStatis = NULL; pBlock->pBlockStatis = NULL;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex; int64_t groupId = pRuntimeEnv->current->groupIndex;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
SQInfo* pQInfo = pRuntimeEnv->qinfo; SQInfo* pQInfo = pRuntimeEnv->qinfo;
...@@ -2582,8 +2510,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2582,8 +2510,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// compare tag first // compare tag first
tVariant t = {0}; tVariant t = {0};
doSetTagValueInParam(pQuery->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes);
setTimestampListJoinInfo(pRuntimeEnv, &t, pQuery->current); setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current);
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (tVariantCompare(&t, elem.tag) != 0))) { if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (tVariantCompare(&t, elem.tag) != 0))) {
...@@ -2619,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2619,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
} else if (pQuery->stableQuery && (!pQuery->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate } else if (pQuery->stableQuery && (!pQuery->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
pQuery->current->groupIndex); pRuntimeEnv->current->groupIndex);
} }
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
...@@ -3281,7 +3209,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe ...@@ -3281,7 +3209,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex, void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex,
TSKEY nextKey) { TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
// lastKey needs to be updated // lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey; pTableQueryInfo->lastKey = nextKey;
...@@ -3430,7 +3358,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx ...@@ -3430,7 +3358,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
*/ */
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = pQuery->current; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo; SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo;
if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) { if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
...@@ -3662,7 +3590,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3662,7 +3590,7 @@ void queryCostStatis(SQInfo *pQInfo) {
//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { //static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
// SQuery *pQuery = pRuntimeEnv->pQuery; // SQuery *pQuery = pRuntimeEnv->pQuery;
// STableQueryInfo* pTableQueryInfo = pQuery->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// //
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// //
...@@ -3693,7 +3621,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3693,7 +3621,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); // int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
// //
// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, pRuntimeEnv->qinfo, // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, pRuntimeEnv->qinfo,
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pRuntimeEnv->current->lastKey);
//} //}
//void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { //void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
...@@ -3706,7 +3634,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3706,7 +3634,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// pQuery->pos = 0; // pQuery->pos = 0;
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// //
// STableQueryInfo* pTableQueryInfo = pQuery->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; // TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
// //
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
...@@ -3772,7 +3700,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3772,7 +3700,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// //
// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
// pRuntimeEnv->qinfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, // pRuntimeEnv->qinfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
// pQuery->current->lastKey); // pRuntimeEnv->current->lastKey);
// //
// return key; // return key;
// } else { // do nothing // } else { // do nothing
...@@ -3789,9 +3717,9 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3789,9 +3717,9 @@ void queryCostStatis(SQInfo *pQInfo) {
//static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { //static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
// SQuery *pQuery = pRuntimeEnv->pQuery; // SQuery *pQuery = pRuntimeEnv->pQuery;
// if (QUERY_IS_ASC_QUERY(pQuery)) { // if (QUERY_IS_ASC_QUERY(pQuery)) {
// assert(*start <= pQuery->current->lastKey); // assert(*start <= pRuntimeEnv->current->lastKey);
// } else { // } else {
// assert(*start >= pQuery->current->lastKey); // assert(*start >= pRuntimeEnv->current->lastKey);
// } // }
// //
// // if queried with value filter, do NOT forward query start position // // if queried with value filter, do NOT forward query start position
...@@ -3810,7 +3738,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3810,7 +3738,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); // bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// //
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; // SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
// STableQueryInfo *pTableQueryInfo = pQuery->current; // STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
// //
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { // while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
...@@ -4137,8 +4065,9 @@ static SSDataBlock* doTableScanImpl(void* param) { ...@@ -4137,8 +4065,9 @@ static SSDataBlock* doTableScanImpl(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = &pTableScanInfo->block; SSDataBlock* pBlock = &pTableScanInfo->block;
SQuery* pQuery = pOperator->pRuntimeEnv->pQuery; SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo;
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
...@@ -4150,14 +4079,14 @@ static SSDataBlock* doTableScanImpl(void* param) { ...@@ -4150,14 +4079,14 @@ static SSDataBlock* doTableScanImpl(void* param) {
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
// todo opt // todo opt
if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) { if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
STableQueryInfo** pTableQueryInfo = STableQueryInfo** pTableQueryInfo =
(STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid));
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
break; break;
} }
pQuery->current = *pTableQueryInfo; pRuntimeEnv->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
} }
...@@ -4285,7 +4214,7 @@ static SSDataBlock* doBlockInfoScan(void* param) { ...@@ -4285,7 +4214,7 @@ static SSDataBlock* doBlockInfoScan(void* param) {
tbufCloseWriter(&bw); tbufCloseWriter(&bw);
SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv, 0); SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv, 0);
pOperator->pRuntimeEnv->pQuery->current = taosArrayGetP(g, 0); pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return pBlock; return pBlock;
...@@ -4459,7 +4388,7 @@ static SSDataBlock* doAggregate(void* param) { ...@@ -4459,7 +4388,7 @@ static SSDataBlock* doAggregate(void* param) {
break; break;
} }
setTagValue(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
if (upstream->operatorType == OP_DataBlocksOptScan) { if (upstream->operatorType == OP_DataBlocksOptScan) {
STableScanInfo* pScanInfo = upstream->info; STableScanInfo* pScanInfo = upstream->info;
...@@ -4512,7 +4441,7 @@ static SSDataBlock* doSTableAggregate(void* param) { ...@@ -4512,7 +4441,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
break; break;
} }
setTagValue(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
if (upstream->operatorType == OP_DataBlocksOptScan) { if (upstream->operatorType == OP_DataBlocksOptScan) {
STableScanInfo* pScanInfo = upstream->info; STableScanInfo* pScanInfo = upstream->info;
...@@ -4523,7 +4452,7 @@ static SSDataBlock* doSTableAggregate(void* param) { ...@@ -4523,7 +4452,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, key); setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key);
doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock);
} }
...@@ -4562,7 +4491,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { ...@@ -4562,7 +4491,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
break; break;
} }
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// todo dynamic set tags // todo dynamic set tags
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
...@@ -4736,7 +4665,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { ...@@ -4736,7 +4665,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
...@@ -4843,7 +4772,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param) { ...@@ -4843,7 +4772,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
setTagValue(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (pInfo->colIndex == -1) { if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock);
} }
...@@ -6226,14 +6155,14 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6226,14 +6155,14 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
} }
pQuery->srcRowSize = 0; pQuery->srcRowSize = 0;
pQuery->maxSrcColumnSize = 0; pQuery->maxTableColumnWidth = 0;
for (int16_t i = 0; i < numOfCols; ++i) { for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->colList[i] = pQueryMsg->colList[i]; pQuery->colList[i] = pQueryMsg->colList[i];
pQuery->colList[i].filterInfo = tFilterInfoDup(pQueryMsg->colList[i].filterInfo, pQuery->colList[i].numOfFilters); pQuery->colList[i].filterInfo = tFilterInfoDup(pQueryMsg->colList[i].filterInfo, pQuery->colList[i].numOfFilters);
pQuery->srcRowSize += pQuery->colList[i].bytes; pQuery->srcRowSize += pQuery->colList[i].bytes;
if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) { if (pQuery->maxTableColumnWidth < pQuery->colList[i].bytes) {
pQuery->maxSrcColumnSize = pQuery->colList[i].bytes; pQuery->maxTableColumnWidth = pQuery->colList[i].bytes;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册