提交 ec2071fa 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 11f07ea0
...@@ -443,7 +443,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -443,7 +443,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL; pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj // free result for async object will also free sqlObj
assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns
taos_free_result(pPrevSub); taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
...@@ -834,6 +834,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -834,6 +834,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
...@@ -2605,12 +2607,17 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo ...@@ -2605,12 +2607,17 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
// clear the limit/offset info, since it should not be sent to vnode to be executed.
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex. // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex; pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew; pSql->pSubs[trsupport->subqueryIndex] = pNew;
} }
......
...@@ -1722,10 +1722,15 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -1722,10 +1722,15 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField)); pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField));
assert(pQueryInfo->exprList == NULL); assert(pQueryInfo->exprList == NULL);
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId= -1000; pQueryInfo->resColumnId = -1000;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
} }
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
......
...@@ -228,8 +228,6 @@ typedef struct SQuery { ...@@ -228,8 +228,6 @@ typedef struct SQuery {
uint32_t status; // query status uint32_t status; // query status
STableQueryInfo* current; STableQueryInfo* current;
int32_t numOfCheckedBlocks; // number of check data blocks
void* tsdb; void* tsdb;
SMemRef memRef; SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
......
...@@ -1121,8 +1121,8 @@ static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSData ...@@ -1121,8 +1121,8 @@ static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSData
static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pBlock->info.rows;
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag; pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag;
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo);
...@@ -1148,8 +1148,8 @@ static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SS ...@@ -1148,8 +1148,8 @@ static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SS
static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pBlock->info.rows;
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag; pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag;
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo);
...@@ -1163,7 +1163,6 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, ...@@ -1163,7 +1163,6 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
pCtx[i].pInput = p->pData; pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type && pCtx[i].inputBytes == p->info.bytes); assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type && pCtx[i].inputBytes == p->info.bytes);
uint32_t status = aAggs[pCtx[i].functionId].status; uint32_t status = aAggs[pCtx[i].functionId].status;
...@@ -1254,8 +1253,7 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* ...@@ -1254,8 +1253,7 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo*
} }
static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx,
int32_t pos, int32_t numOfRows, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) {
SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -2376,22 +2374,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2376,22 +2374,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
} else { // diff/add/multiply/subtract/division } else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1); assert(pQuery->checkResultBuf == 1);
// if (isTsCompQuery(pQuery)) { if (!onlyQueryTags(pQuery)) {
// pRuntimeEnv->proot = createSeqTableBlockScanOperator(pRuntimeEnv->pTableScanner, pRuntimeEnv);
/*} else*/ if (!onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
} }
} }
if (!pQuery->stableQuery || isProjQuery(pQuery)) { // TODO this problem should be handed at the client side if (pQuery->limit.offset > 0) {
if (pQuery->limit.offset > 0) { pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); }
}
if (pQuery->limit.limit > 0) { if (pQuery->limit.limit > 0) {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3460,6 +3454,7 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC ...@@ -3460,6 +3454,7 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC
// set tag value, by which the results are aggregated. // set tag value, by which the results are aggregated.
int32_t offset = 0; int32_t offset = 0;
memset(pRuntimeEnv->tagVal, 0, pQuery->tagLen); memset(pRuntimeEnv->tagVal, 0, pQuery->tagLen);
for (int32_t idx = 0; idx < numOfOutput; ++idx) { for (int32_t idx = 0; idx < numOfOutput; ++idx) {
SExprInfo* pLocalExprInfo = &pExpr[idx]; SExprInfo* pLocalExprInfo = &pExpr[idx];
...@@ -3481,6 +3476,36 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC ...@@ -3481,6 +3476,36 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC
offset += pLocalExprInfo->bytes; offset += pLocalExprInfo->bytes;
} }
if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) {
//todo : use index to avoid iterator all possible output columns
for(int32_t i = 0; i < numOfOutput; ++i) {
if(pExpr[i].base.functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
SSqlFuncMsg* pFuncMsg = &pExpr[i].base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult);
for(int32_t j = 0; j < numOfGroup; ++j) {
SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) {
int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult);
for(int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pFuncMsg->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
}
// set the join tag for first column // set the join tag for first column
SSqlFuncMsg* pFuncMsg = &pExprInfo->base; SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
if (pQuery->stableQuery && if (pQuery->stableQuery &&
...@@ -4545,44 +4570,45 @@ void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, S ...@@ -4545,44 +4570,45 @@ void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, S
// return 0; // return 0;
//} //}
//int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) { int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
// SQuery* pQuery = pRuntimeEnv->pQuery; SExprInfo* pExpr) {
// SQuery* pQuery = pRuntimeEnv->pQuery;
// if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) {
// return TSDB_CODE_SUCCESS; if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) {
// } return TSDB_CODE_SUCCESS;
// }
// int32_t numOfExprs = pQuery->numOfOutput;
// for(int32_t i = 0; i < numOfExprs; ++i) { int32_t numOfExprs = pQuery->numOfOutput;
// SExprInfo* pExprInfo = &(pQuery->pExpr1[i]); for(int32_t i = 0; i < numOfExprs; ++i) {
// if(pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { SExprInfo* pExprInfo = &(pExpr[i]);
// continue; if(pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
// } continue;
// }
// SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
// SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
// pRuntimeEnv->pCtx[i].param[0].arr = NULL;
// pRuntimeEnv->pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int pCtx[i].param[0].arr = NULL;
// pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult);
// for(int32_t j = 0; j < numOfGroup; ++j) { // TODO use hash to speedup this loop
// SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j); int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult);
// if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) { for(int32_t j = 0; j < numOfGroup; ++j) {
// SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j);
// int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult); if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) {
// for(int32_t k = 0; k < numOfCols; ++k) { int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult);
// SStddevInterResult* pres = taosArrayGet(p->pResult, k); for(int32_t k = 0; k < numOfCols; ++k) {
// if (pres->colId == pFuncMsg->colInfo.colId) { SStddevInterResult* pres = taosArrayGet(p->pResult, k);
// pRuntimeEnv->pCtx[i].param[0].arr = pres->pResult; if (pres->colId == pFuncMsg->colInfo.colId) {
// break; pCtx[i].param[0].arr = pres->pResult;
// } break;
// } }
// } }
// } }
// } }
// }
// return 0;
//} return 0;
}
/* /*
* There are two cases to handle: * There are two cases to handle:
...@@ -5447,6 +5473,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts ...@@ -5447,6 +5473,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->topBotQuery = isTopBottomQuery(pQuery); pQuery->topBotQuery = isTopBottomQuery(pQuery);
pQuery->hasTagResults = hasTagValOutput(pQuery); pQuery->hasTagResults = hasTagValOutput(pQuery);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
pQuery->stabledev = isStabledev(pQuery);
pRuntimeEnv->prevResult = prevResult; pRuntimeEnv->prevResult = prevResult;
pRuntimeEnv->qinfo = pQInfo; pRuntimeEnv->qinfo = pQInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册