提交 fb8024fb 编写于 作者: H Haojun Liao

[td-2859] refactor.

上级 33f50347
......@@ -211,6 +211,8 @@ bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
......
......@@ -218,11 +218,11 @@ typedef struct SQueryInfo {
SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
SArray *pDownstream; // SArray<struct SQueryInfo>
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
} SQueryInfo;
typedef struct {
......
......@@ -77,7 +77,7 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC
static uint8_t convertOptr(SStrToken *pToken);
static int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery);
static int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery);
static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
......@@ -1607,7 +1607,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
return false;
}
int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelNodeList, bool isSTable, bool joinQuery,
int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery,
bool timeWindowQuery) {
assert(pSelNodeList != NULL && pCmd != NULL);
......@@ -1617,8 +1617,6 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelN
const char* msg4 = "only support distinct one tag";
const char* msg5 = "invalid function name";
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
// too many result columns not support order by in query
if (taosArrayGetSize(pSelNodeList) > TSDB_MAX_COLUMNS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
......@@ -6710,7 +6708,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
}
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (validateSelectNodeList(&pSql->cmd, 0, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) {
if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -6932,18 +6930,30 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
}
if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
// support only one nestquery
// parse the subquery in the first place
code = validateSqlNode(pSql, pQuerySqlNode->from->pNode.pClause[0], 0);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return code;
}
pQueryInfo = pCmd->pQueryInfo[0];
SQueryInfo* current = calloc(1, sizeof(SQueryInfo));
tscInitQueryInfo(current);
taosArrayPush(current->pUpstream, &pQueryInfo);
STableMeta* pTableMeta = extractTempTableMetaFromNestQuery(pQueryInfo);
STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo));
STableMeta* pTableMeta = extractTempTableMetaFromNestQuery(taosArrayGetP(pQueryInfo->pUpstream, 0));
pTableMetaInfo1->pTableMeta = pTableMeta;
pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES);
pQueryInfo->pTableMetaInfo[0] = pTableMetaInfo1;
current->pTableMetaInfo = calloc(1, POINTER_BYTES);
current->pTableMetaInfo[0] = pTableMetaInfo1;
// parse the group by clause in the first place
if (validateSelectNodeList(pCmd, index, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
pCmd->pQueryInfo[0] = current;
pQueryInfo->pDownstream = current;
if (validateSelectNodeList(pCmd, index, current, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -7002,7 +7012,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval) || TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap));
if (validateSelectNodeList(pCmd, index, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) !=
if (validateSelectNodeList(pCmd, index, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......
......@@ -615,8 +615,8 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
}
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg, int32_t *succeed) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, char *pMsg,
int32_t *succeed) {
TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -878,7 +878,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t succeed = 1;
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg, &succeed);
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pTableMetaInfo, pMsg, &succeed);
if (succeed == 0) {
return TSDB_CODE_TSC_APP_ERROR;
}
......@@ -1591,18 +1591,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = 0;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
//pRes->code = tscDoLocalMerge(pSql);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p);
}
pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0);
convertQueryResult(pRes, pQueryInfo);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......
......@@ -584,7 +584,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
pQueryInfo->pUpstream = taosArrayInit(4, sizeof(POINTER_BYTES));
pQueryInfo->pDownstream = taosArrayInit(4, sizeof(POINTER_BYTES));
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
......@@ -3590,7 +3589,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
STsBufInfo bufInfo = {0};
SQueryParam param = {.pOperator = pa};
/*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, &param, NULL, 0, merger);
/*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger);
return pQInfo;
_cleanup:
......
......@@ -665,12 +665,24 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t
return pOptr;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p);
}
pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0);
}
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) {
if (pQueryInfo->pDownstream != NULL) {
// handle the following query process
SQueryInfo* px = taosArrayGetP(pQueryInfo->pDownstream, 0);
printf("%d\n", px->type);
SQueryInfo *px = pQueryInfo->pDownstream;
SColumnInfo* colInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList);
int32_t numOfOutput = tscSqlExprNumOfExprs(px);
......@@ -696,11 +708,11 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput);
SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
SSDataBlock* pres = pQInfo->runtimeEnv.outputBuf;
px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
// build result
pRes->numOfRows = pres->info.rows;
uint64_t qId = 0;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pRes, px);
}
}
......@@ -2172,7 +2184,6 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
}
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
......@@ -2223,10 +2234,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
tfree(pQueryInfo->buf);
taosArrayDestroy(pQueryInfo->pUpstream);
taosArrayDestroy(pQueryInfo->pDownstream);
pQueryInfo->pUpstream = NULL;
pQueryInfo->pDownstream = NULL;
}
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
......@@ -2697,6 +2705,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0);
pSql->cmd.active = pq;
pSql->cmd.command = TSDB_SQL_SELECT;
executeQuery(pSql, pq);
// merge nest query result and generate final results
......
......@@ -520,7 +520,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen, void* merger);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
......@@ -1715,7 +1715,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Aggregate: {
pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pRuntimeEnv->pTableScanner->operatorType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
}
break;
}
......@@ -3811,9 +3813,8 @@ void queryCostStatis(SQInfo *pQInfo) {
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// TODO set the tags scan handle
if (onlyQueryTags(pQueryAttr)) {
......@@ -3839,7 +3840,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
terrno = TSDB_CODE_SUCCESS;
if (isFirstLastRowQuery(pQueryAttr)) {
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, pQInfo->qId, &pQueryAttr->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
// update the query time window
pQueryAttr->window = cond.twindow;
......@@ -3860,9 +3861,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
}
} else if (pQueryAttr->pointInterpQuery) {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, pQInfo->qId, &pQueryAttr->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, pQInfo->qId, &pQueryAttr->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
}
return terrno;
......@@ -3894,7 +3895,7 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol;
}
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* tsdb, int32_t tbScanner,
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* tsdb, void* sourceOptr, int32_t tbScanner,
SArray* pOperator, void* param) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -3904,13 +3905,12 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts
pRuntimeEnv->prevResult = prevResult;
if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pQInfo, pQueryAttr->stableQuery);
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pQueryAttr->tsdb = tsdb;
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (pQueryAttr->stableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0);
......@@ -3941,6 +3941,12 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts
break;
}
}
if (sourceOptr != NULL) {
assert(pRuntimeEnv->pTableScanner == NULL);
pRuntimeEnv->pTableScanner = sourceOptr;
}
if (pTsBuf != NULL) {
int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
......@@ -4538,7 +4544,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
break;
}
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
if (pRuntimeEnv->current != NULL) {
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
if (upstream->operatorType == OP_DataBlocksOptScan) {
STableScanInfo* pScanInfo = upstream->info;
......@@ -6558,7 +6566,7 @@ bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen, void* merger) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -6604,7 +6612,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) {
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -172,7 +172,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
_over:
if (param.pGroupbyExpr != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册