提交 c8b92c46 编写于 作者: H Haojun Liao

[TD-225]refactor

上级 59be5440
...@@ -356,7 +356,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { ...@@ -356,7 +356,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
continue; continue;
} }
if ((aAggs[functId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { if ((aAggs[functId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++; numOfSelectivity++;
} }
} }
...@@ -379,9 +379,9 @@ bool isProjQuery(SQuery *pQuery) { ...@@ -379,9 +379,9 @@ bool isProjQuery(SQuery *pQuery) {
return true; return true;
} }
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; } bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) { static bool limitOperator(SQueryRuntimeEnv* pRuntimeEnv) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -835,7 +835,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow ...@@ -835,7 +835,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].startTs = pWin->skey;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pExpr1[k].base.functionId; int32_t functionId = pQuery->pExpr1[k].base.functionId;
...@@ -860,7 +860,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow * ...@@ -860,7 +860,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].startTs = pWin->skey;
int32_t functionId = pQuery->pExpr1[k].base.functionId; int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
...@@ -1277,7 +1277,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1277,7 +1277,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId; int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].nStartQueryTimestamp = pQuery->window.skey; pCtx[k].startTs = pQuery->window.skey;
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
} }
...@@ -1793,7 +1793,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY ...@@ -1793,7 +1793,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1); pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1);
assert(pCtx->startOffset >= 0); assert(pCtx->startOffset >= 0);
uint32_t status = aAggs[functionId].nStatus; uint32_t status = aAggs[functionId].status;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
pCtx->ptsList = tsCol; pCtx->ptsList = tsCol;
} }
...@@ -1878,7 +1878,7 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1878,7 +1878,7 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pCtx[i].outputBytes; tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i]; pTagCtx[num++] = &pCtx[i];
} else if ((aAggs[pSqlFuncMsg->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { } else if ((aAggs[pSqlFuncMsg->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
p = &pCtx[i]; p = &pCtx[i];
} else if (pSqlFuncMsg->functionId == TSDB_FUNC_TS || pSqlFuncMsg->functionId == TSDB_FUNC_TAG) { } else if (pSqlFuncMsg->functionId == TSDB_FUNC_TS || pSqlFuncMsg->functionId == TSDB_FUNC_TAG) {
// tag function may be the group by tag column // tag function may be the group by tag column
...@@ -2052,7 +2052,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2052,7 +2052,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
qDebug("QInfo:%p teardown runtime env", pQInfo); qDebug("QInfo:%p teardown runtime env", pQInfo);
cleanupResultRowInfo(&pRuntimeEnv->windowResInfo); cleanupResultRowInfo(&pRuntimeEnv->windowResInfo);
if (isTSCompQuery(pQuery)) { if (isTsCompQuery(pQuery)) {
FILE *f = *(FILE **)pQuery->sdata[0]->data; FILE *f = *(FILE **)pQuery->sdata[0]->data;
if (f) { if (f) {
...@@ -2154,7 +2154,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -2154,7 +2154,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
continue; continue;
} }
if (!IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].nStatus)) { if (!IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].status)) {
return true; return true;
} }
} }
...@@ -2279,7 +2279,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { ...@@ -2279,7 +2279,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
continue; continue;
} }
hasMultioutput = IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].nStatus); hasMultioutput = IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].status);
if (!hasMultioutput) { if (!hasMultioutput) {
break; break;
} }
...@@ -2777,7 +2777,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa ...@@ -2777,7 +2777,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTSCompQuery(pQuery)) { if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTsCompQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec; SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
...@@ -3503,7 +3503,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { ...@@ -3503,7 +3503,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
assert(functionId != TSDB_FUNC_DIFF); assert(functionId != TSDB_FUNC_DIFF);
// set next output position // set next output position
if (IS_OUTER_FORWARD(aAggs[functionId].nStatus)) { if (IS_OUTER_FORWARD(aAggs[functionId].status)) {
pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output; pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output;
} }
...@@ -3762,7 +3762,7 @@ static void handleInterpolationQuery(SQInfo* pQInfo) { ...@@ -3762,7 +3762,7 @@ static void handleInterpolationQuery(SQInfo* pQInfo) {
} }
pCtx->param[2].i64 = (int8_t)pQuery->fillType; pCtx->param[2].i64 = (int8_t)pQuery->fillType;
pCtx->nStartQueryTimestamp = pQuery->window.skey; pCtx->startTs = pQuery->window.skey;
if (pQuery->fillVal != NULL) { if (pQuery->fillVal != NULL) {
if (isNull((const char *)&pQuery->fillVal[i], pCtx->inputType)) { if (isNull((const char *)&pQuery->fillVal[i], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
...@@ -4117,7 +4117,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -4117,7 +4117,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
bool requireTimestamp(SQuery *pQuery) { bool requireTimestamp(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; i++) { for (int32_t i = 0; i < pQuery->numOfOutput; i++) {
int32_t functionId = pQuery->pExpr1[i].base.functionId; int32_t functionId = pQuery->pExpr1[i].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) { if ((aAggs[functionId].status & TSDB_FUNCSTATE_NEED_TS) != 0) {
return true; return true;
} }
} }
...@@ -5305,7 +5305,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5305,7 +5305,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
break; break;
} }
} else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTSCompQuery(pQuery)) { } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) {
//super table projection query with identical query time range for all tables. //super table projection query with identical query time range for all tables.
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
resetDefaultResInfoOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
...@@ -5425,7 +5425,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5425,7 +5425,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} else { } else {
// the limitation of output result is reached, set the query completed // the limitation of output result is reached, set the query completed
skipResults(pRuntimeEnv); skipResults(pRuntimeEnv);
if (limitResults(pRuntimeEnv)) { if (limitOperator(pRuntimeEnv)) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo); SET_STABLE_QUERY_OVER(pQInfo);
break; break;
...@@ -5493,7 +5493,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5493,7 +5493,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
skipResults(pRuntimeEnv); skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed // the limitation of output result is reached, set the query completed
if (limitResults(pRuntimeEnv)) { if (limitOperator(pRuntimeEnv)) {
SET_STABLE_QUERY_OVER(pQInfo); SET_STABLE_QUERY_OVER(pQInfo);
break; break;
} }
...@@ -5548,7 +5548,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5548,7 +5548,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* *
* Only the ts-comp query requires the finalizer function to be executed here. * Only the ts-comp query requires the finalizer function to be executed here.
*/ */
if (isTSCompQuery(pQuery)) { if (isTsCompQuery(pQuery)) {
finalizeQueryResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
} }
...@@ -5796,8 +5796,9 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5796,8 +5796,9 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
// TODO limit/offset refactor to be one operator
skipResults(pRuntimeEnv); skipResults(pRuntimeEnv);
limitResults(pRuntimeEnv); limitOperator(pRuntimeEnv);
} }
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
...@@ -5805,7 +5806,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5805,7 +5806,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
// for ts_comp query, re-initialized is not allowed // for ts_comp query, re-initialized is not allowed
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (!isTSCompQuery(pQuery)) { if (!isTsCompQuery(pQuery)) {
resetDefaultResInfoOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
} }
...@@ -5839,7 +5840,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5839,7 +5840,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
resetDefaultResInfoOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
} }
limitResults(pRuntimeEnv); limitOperator(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
...@@ -5848,7 +5849,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5848,7 +5849,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
taosHashPut(pQInfo->arrTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo)); taosHashPut(pQInfo->arrTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
} }
if (!isTSCompQuery(pQuery)) { if (!isTsCompQuery(pQuery)) {
assert(pQuery->rec.rows <= pQuery->rec.capacity); assert(pQuery->rec.rows <= pQuery->rec.capacity);
} }
} }
...@@ -5888,7 +5889,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -5888,7 +5889,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
doSecondaryArithmeticProcess(pQuery); doSecondaryArithmeticProcess(pQuery);
limitResults(pRuntimeEnv); limitOperator(pRuntimeEnv);
} else { } else {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
...@@ -5901,7 +5902,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -5901,7 +5902,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled); pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv); limitOperator(pRuntimeEnv);
} }
} }
} }
...@@ -5920,7 +5921,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -5920,7 +5921,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled); pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
if (pQuery->rec.rows > 0) { if (pQuery->rec.rows > 0) {
limitResults(pRuntimeEnv); limitOperator(pRuntimeEnv);
} }
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
...@@ -7122,7 +7123,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { ...@@ -7122,7 +7123,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
* the returned row size is equalled to 1 * the returned row size is equalled to 1
* TODO handle the case that the file is too large to send back one time * TODO handle the case that the file is too large to send back one time
*/ */
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { if (isTsCompQuery(pQuery) && (*numOfRows) > 0) {
struct stat fStat; struct stat fStat;
FILE *f = *(FILE **)pQuery->sdata[0]->data; FILE *f = *(FILE **)pQuery->sdata[0]->data;
if ((f != NULL) && (fstat(fileno(f), &fStat) == 0)) { if ((f != NULL) && (fstat(fileno(f), &fStat) == 0)) {
...@@ -7142,7 +7143,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { ...@@ -7142,7 +7143,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// load data from file to msg buffer // load data from file to msg buffer
if (isTSCompQuery(pQuery)) { if (isTsCompQuery(pQuery)) {
FILE *f = *(FILE **)pQuery->sdata[0]->data; // TODO refactor FILE *f = *(FILE **)pQuery->sdata[0]->data; // TODO refactor
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册