提交 2891d51a 编写于 作者: H Haojun Liao

[td-2895] fix bug found by regression test.

上级 2b9e364f
......@@ -1536,6 +1536,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
}
return false;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) {
assert(pSelection != NULL && pCmd != NULL);
......@@ -1871,6 +1872,26 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT
}
}
static void updateLastQueryInfoForGroupby(SQueryInfo* pQueryInfo, STableMeta* pTableMeta, int32_t functionId, int32_t index) {
if (functionId != TSDB_FUNC_LAST) { // todo refactor
return;
}
SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr;
if (pGroupBy->numOfGroupCols > 0) {
for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMeta)) { // group by normal columns
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, index);
pExpr->numOfParams = 1;
pExpr->param->i64 = TSDB_ORDER_ASC;
return;
}
}
}
}
int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult) {
STableMetaInfo* pTableMetaInfo = NULL;
int32_t functionId = pItem->pNode->functionId;
......@@ -2135,6 +2156,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex - 1);
}
} else {
......@@ -2150,7 +2173,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
char name[TSDB_COL_NAME_LEN] = {0};
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
bool multiColOutput = pItem->pNode->pParam->nExpr > 1;
......@@ -2160,21 +2182,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_SQL;
}
if (functionId == TSDB_FUNC_LAST) { // todo refactor
SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr;
if (pGroupBy->numOfGroupCols > 0) {
for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { // group by normal columns
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, colIndex + i);
pExpr->numOfParams = 1;
pExpr->param->i64 = TSDB_ORDER_ASC;
break;
}
}
}
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex + i);
}
}
......@@ -2202,6 +2210,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex);
colIndex++;
}
......@@ -2211,6 +2220,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
}
case TSDB_FUNC_TOP:
case TSDB_FUNC_BOTTOM:
case TSDB_FUNC_PERCT:
......
......@@ -35,6 +35,13 @@
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define CHECK_IF_QUERY_KILLED(_q) \
do { \
if (isQueryKilled((_q)->qinfo)) { \
longjmp((_q)->env, TSDB_CODE_TSC_QUERY_CANCELLED); \
} \
} while (0)
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
#define TIME_WINDOW_COPY(_dst, _src) do {\
......@@ -189,6 +196,9 @@ static bool isPointInterpoQuery(SQuery *pQuery);
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex);
// setup the output buffer for each operator
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
......@@ -378,6 +388,32 @@ static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) {
return true;
}
static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntimeEnv* pRuntimeEnv) {
// more than the capacity, reallocate the resources
if (pResultRowInfo->size < pResultRowInfo->capacity) {
return;
}
int64_t newCapacity = 0;
if (pResultRowInfo->capacity > 10000) {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25);
} else {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
}
char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES));
if (t == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pResultRowInfo->pResult = (SResultRow **)t;
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc);
pResultRowInfo->capacity = (int32_t)newCapacity;
}
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
......@@ -408,28 +444,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
}
if (!existed) {
// TODO refactor
// more than the capacity, reallocate the resources
if (pResultRowInfo->size >= pResultRowInfo->capacity) {
int64_t newCapacity = 0;
if (pResultRowInfo->capacity > 10000) {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25);
} else {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
}
char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES));
if (t == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pResultRowInfo->pResult = (SResultRow **)t;
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc);
pResultRowInfo->capacity = (int32_t)newCapacity;
}
prepareResultListBuffer(pResultRowInfo, pRuntimeEnv);
SResultRow *pResult = NULL;
......@@ -2486,6 +2501,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} else if (pQuery->stableQuery) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
pQuery->current->groupIndex);
}
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
......@@ -3101,7 +3120,9 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
int16_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) {
SResultRowCellInfo* pResInfo = pCtx[i].resultInfo;
if (pResInfo->initialized && pResInfo->complete) {
offset += pCtx[i].outputBytes;
continue;
}
......@@ -3114,25 +3135,17 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
if (!pCtx[i].resultInfo->initialized) {
if (!pResInfo->initialized) {
aAggs[functionId].init(&pCtx[i]);
}
}
}
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex,
TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return;
}
void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex) {
int64_t uid = 0;
SResultRow* pResultRow =
doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid);
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid);
assert (pResultRow != NULL);
/*
......@@ -3140,16 +3153,29 @@ void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, i
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize) !=
TSDB_CODE_SUCCESS) {
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return;
}
}
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
}
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex,
TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return;
}
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, groupIndex);
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
setResultOutputBuf(pRuntimeEnv, pResultRow, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
initCtxOutputBuffer(pInfo->pCtx, numOfOutput);
}
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
......@@ -3981,11 +4007,14 @@ static SSDataBlock* doTableScanImpl(void* param) {
STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo;
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) {
longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// todo check for query cancel
pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
// todo opt
if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) {
STableQueryInfo** pTableQueryInfo =
(STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid));
......@@ -4349,8 +4378,8 @@ static SSDataBlock* doSTableAggregate(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k);
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, key);
doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock);
}
......
......@@ -107,3 +107,6 @@ sleep 100
run general/parser/function.sim
sleep 100
run general/parser/stableOp.sim
sleep 100
run general/parser/slimit_alter_tags.sim
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册