未验证 提交 e6c5251e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2572 from taosdata/feature/query

Feature/query
...@@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision); taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
...@@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo ...@@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
if (pFillInfo != NULL) { if (pFillInfo != NULL) {
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision); taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
taosResetFillInfo(pFillInfo, revisedSTime); taosResetFillInfo(pFillInfo, revisedSTime);
} }
...@@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer ...@@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t newTime = int64_t newTime =
taosGetIntervalStartTimestamp(skey, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime,
// pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize);
taosResetFillInfo(pLocalReducer->pFillInfo, newTime); taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
} }
} }
......
...@@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->uid = htobe64(pTableMeta->uid);
pUpdateMsg->colId = htons(pTagsSchema->colId); pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->tversion = htons(pTableMeta->tversion); pUpdateMsg->type = htons(pTagsSchema->type);
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
pUpdateMsg->tversion = htons(pTableMeta->tversion);
pUpdateMsg->numOfTags = htons(numOfTags); pUpdateMsg->numOfTags = htons(numOfTags);
pUpdateMsg->schemaLen = htonl(schemaLen); pUpdateMsg->schemaLen = htonl(schemaLen);
......
...@@ -29,4 +29,6 @@ bool tscValidateTableNameLength(size_t len); ...@@ -29,4 +29,6 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -75,3 +75,33 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO ...@@ -75,3 +75,33 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
return pFilter; return pFilter;
} }
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
return startTime;
}
int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime;
if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
#endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
start += timezone * t;
}
int64_t end = start + intervalTime - 1;
if (end < startTime) {
start += slidingTime;
}
return start;
}
...@@ -285,6 +285,8 @@ typedef struct { ...@@ -285,6 +285,8 @@ typedef struct {
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
int16_t colId; int16_t colId;
int16_t type;
int16_t bytes;
int32_t tagValLen; int32_t tagValLen;
int16_t numOfTags; int16_t numOfTags;
int32_t schemaLen; int32_t schemaLen;
......
...@@ -108,7 +108,9 @@ void tsdbClearTableCfg(STableCfg *config); ...@@ -108,7 +108,9 @@ void tsdbClearTableCfg(STableCfg *config);
void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes); void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(void *pTable); char* tsdbGetTableName(void *pTable);
STableId tsdbGetTableId(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
......
...@@ -154,6 +154,7 @@ typedef struct SQuery { ...@@ -154,6 +154,7 @@ typedef struct SQuery {
} SQuery; } SQuery;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
jmp_buf env;
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
...@@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv { ...@@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv {
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // false bool topBotQuery; // false
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
...@@ -197,8 +200,10 @@ typedef struct SQInfo { ...@@ -197,8 +200,10 @@ typedef struct SQInfo {
*/ */
int32_t tableIndex; int32_t tableIndex;
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
_qinfo_free_fn_t freeFn; _qinfo_free_fn_t freeFn; //todo remove it
jmp_buf env;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
} SQInfo; } SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -60,8 +60,6 @@ typedef struct SPoint { ...@@ -60,8 +60,6 @@ typedef struct SPoint {
void * val; void * val;
} SPoint; } SPoint;
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol); SFillColInfo* pFillCol);
......
...@@ -100,6 +100,16 @@ static UNUSED_FUNC void *u_malloc (size_t __size) { ...@@ -100,6 +100,16 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
} }
} }
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
uint32_t v = rand();
if (v % 5 <= 1) {
return NULL;
} else {
return calloc(num, __size);
}
}
#define calloc u_calloc
#define malloc u_malloc #define malloc u_malloc
#endif #endif
...@@ -109,6 +119,7 @@ static UNUSED_FUNC void *u_malloc (size_t __size) { ...@@ -109,6 +119,7 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
static void setQueryStatus(SQuery *pQuery, int8_t status); static void setQueryStatus(SQuery *pQuery, int8_t status);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
// todo move to utility // todo move to utility
...@@ -314,6 +325,24 @@ static bool isTopBottomQuery(SQuery *pQuery) { ...@@ -314,6 +325,24 @@ static bool isTopBottomQuery(SQuery *pQuery) {
return false; return false;
} }
static bool hasTagValOutput(SQuery* pQuery) {
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
return true;
} else { // set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SExprInfo *pLocalExprInfo = &pQuery->pSelectExpr[idx];
// ts_comp column required the tag value for join filter
if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
return true;
}
}
}
return false;
}
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) { static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) {
// for a tag column, no corresponding field info // for a tag column, no corresponding field info
SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo; SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo;
...@@ -657,26 +686,24 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat ...@@ -657,26 +686,24 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsBuf[offset]; pCtx[k].ptsList = &tsBuf[offset];
} }
// not a whole block involved in query processing, statistics data can not be used // not a whole block involved in query processing, statistics data can not be used
if (forwardStep != numOfTotal) { if (forwardStep != numOfTotal) {
pCtx[k].preAggVals.isSet = false; pCtx[k].preAggVals.isSet = false;
} }
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
}
} }
} }
} }
...@@ -686,14 +713,12 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus ...@@ -686,14 +713,12 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset); aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
} }
} }
} }
...@@ -787,8 +812,8 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas ...@@ -787,8 +812,8 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
return NULL; return NULL;
} }
char *dataBlock = NULL; char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId; int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
...@@ -807,6 +832,10 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas ...@@ -807,6 +832,10 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
sas->numOfCols = pQuery->numOfCols; sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (sas->data == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// here the pQuery->colList and sas->colList are identical // here the pQuery->colList and sas->colList are identical
int32_t numOfCols = taosArrayGetSize(pDataBlock); int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < pQuery->numOfCols; ++i) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
...@@ -860,6 +889,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -860,6 +889,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
} }
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
...@@ -867,7 +899,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -867,7 +899,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
} }
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery) && tsCols != NULL) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && tsCols != NULL) {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = tsCols[offset]; TSKEY ts = tsCols[offset];
...@@ -1051,6 +1083,11 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { ...@@ -1051,6 +1083,11 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) { static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
// in case of timestamp column, always generated results.
if (functionId == TSDB_FUNC_TS) {
return true;
}
if (pResInfo->complete || functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (pResInfo->complete || functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
return false; return false;
...@@ -1063,7 +1100,6 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1063,7 +1100,6 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
// todo add comments // todo add comments
if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) { if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) {
return pCtx->param[0].i64Key == pQuery->order.order; return pCtx->param[0].i64Key == pQuery->order.order;
// return !QUERY_IS_ASC_QUERY(pQuery);
} }
// in the supplementary scan, only the following functions need to be executed // in the supplementary scan, only the following functions need to be executed
...@@ -1084,8 +1120,12 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1084,8 +1120,12 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol;
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int16_t type = 0; int16_t type = 0;
int16_t bytes = 0; int16_t bytes = 0;
...@@ -1231,7 +1271,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl ...@@ -1231,7 +1271,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
STableQueryInfo* pTableQInfo = pQuery->current; STableQueryInfo* pTableQInfo = pQuery->current;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else { } else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
...@@ -1352,14 +1392,16 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY ...@@ -1352,14 +1392,16 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
} }
// set the output buffer for the selectivity + tag query // set the output buffer for the selectivity + tag query
static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (isSelectivityWithTagsQuery(pQuery)) { if (isSelectivityWithTagsQuery(pQuery)) {
int32_t num = 0; int32_t num = 0;
int16_t tagLen = 0; int16_t tagLen = 0;
SQLFunctionCtx *p = NULL; SQLFunctionCtx *p = NULL;
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES); SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
...@@ -1386,7 +1428,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { ...@@ -1386,7 +1428,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
} }
} }
static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) { static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE); assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
...@@ -1477,7 +1519,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1477,7 +1519,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
} }
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx); setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_clean: _clean:
...@@ -1640,8 +1682,7 @@ static bool onlyQueryTags(SQuery* pQuery) { ...@@ -1640,8 +1682,7 @@ static bool onlyQueryTags(SQuery* pQuery) {
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *realWin, STimeWindow *win) { void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *realWin, STimeWindow *win) {
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime); assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision); win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision);
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
/* /*
* if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between * if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between
...@@ -2117,7 +2158,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa ...@@ -2117,7 +2158,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) { if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec; SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
...@@ -2171,7 +2212,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2171,7 +2212,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
// todo extract methods // todo extract methods
if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
STimeWindow realWin = TSWINDOW_INITIALIZER, w = TSWINDOW_INITIALIZER; STimeWindow realWin = TSWINDOW_INITIALIZER, w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
...@@ -2217,7 +2258,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2217,7 +2258,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
} }
if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
...@@ -2638,7 +2679,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2638,7 +2679,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tfree(pTableList); tfree(pTableList);
qError("QInfo:%p failed alloc memory", pQInfo); qError("QInfo:%p failed alloc memory", pQInfo);
longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// todo opt for the case of one table per group // todo opt for the case of one table per group
...@@ -2646,7 +2687,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2646,7 +2687,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pGroup, i); STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tsdbGetTableId(item->pTable).tid); SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
if (list.size > 0 && item->windowResInfo.size > 0) { if (list.size > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables] = item; pTableList[numOfTables] = item;
numOfTables += 1; numOfTables += 1;
...@@ -2669,6 +2710,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2669,6 +2710,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo)); SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo));
if (pResultInfo == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
...@@ -2869,7 +2914,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { ...@@ -2869,7 +2914,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table // group by normal columns and interval query on normal table
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order); disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
} else { // for simple result of table query, } else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
...@@ -3044,7 +3089,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3044,7 +3089,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false; bool toContinue = false;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
...@@ -3236,10 +3281,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { ...@@ -3236,10 +3281,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo); closeAllTimeWindow(pWindowResInfo);
} }
...@@ -3281,10 +3326,10 @@ static bool hasMainOutput(SQuery *pQuery) { ...@@ -3281,10 +3326,10 @@ static bool hasMainOutput(SQuery *pQuery) {
return false; return false;
} }
static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win) { static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win, void* buf) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); STableQueryInfo *pTableQueryInfo = buf;//calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win; pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey; pTableQueryInfo->lastKey = win.skey;
...@@ -3295,7 +3340,8 @@ static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, voi ...@@ -3295,7 +3340,8 @@ static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, voi
int32_t initialSize = 1; int32_t initialSize = 1;
int32_t initialThreshold = 1; int32_t initialThreshold = 1;
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
initialSize = 20; initialSize = 20;
initialThreshold = 100; initialThreshold = 100;
} }
...@@ -3310,7 +3356,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) ...@@ -3310,7 +3356,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
} }
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols); cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols);
free(pTableQueryInfo);
} }
#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \ #define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \
...@@ -3323,7 +3368,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) ...@@ -3323,7 +3368,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
/** /**
* set output buffer for different group * set output buffer for different group
* TODO opt performance if current group is identical to previous group
* @param pRuntimeEnv * @param pRuntimeEnv
* @param pDataBlockInfo * @param pDataBlockInfo
*/ */
...@@ -3334,7 +3378,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { ...@@ -3334,7 +3378,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
// lastKey needs to be updated // lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey; pTableQueryInfo->lastKey = nextKey;
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
}
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return; return;
...@@ -3523,7 +3570,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { ...@@ -3523,7 +3570,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t totalSubset = 0; int32_t totalSubset = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { if (pQInfo->runtimeEnv.groupbyNormalCol || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else { } else {
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo); totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
...@@ -3617,11 +3664,11 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { ...@@ -3617,11 +3664,11 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
assert(pQuery->rec.rows <= pQuery->rec.capacity); assert(pQuery->rec.rows <= pQuery->rec.capacity);
} }
static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// update the number of result for each, only update the number of rows for the corresponding window result. // update the number of result for each, only update the number of rows for the corresponding window result.
if (pQuery->intervalTime == 0) { if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) { for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i]; SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
...@@ -3635,14 +3682,6 @@ static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, S ...@@ -3635,14 +3682,6 @@ static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, S
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes); pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
} }
} }
// int32_t g = pTableQueryInfo->groupIndex;
// assert(pRuntimeEnv->windowResInfo.size > 0);
//
// SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
// if (pWindowRes->numOfRows == 0) {
// pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
// }
} }
} }
...@@ -3654,7 +3693,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo * ...@@ -3654,7 +3693,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else { } else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
...@@ -3696,7 +3735,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -3696,7 +3735,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
} else { } else {
// there are results waiting for returned to client. // there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
(isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) && (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) &&
(pRuntimeEnv->windowResInfo.size > 0)) { (pRuntimeEnv->windowResInfo.size > 0)) {
return true; return true;
} }
...@@ -4101,6 +4140,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4101,6 +4140,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->groupbyNormalCol = isGroupbyNormalCol(pQuery->pGroupbyExpr);
if (pTsBuf != NULL) { if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
...@@ -4125,7 +4165,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4125,7 +4165,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
if (pQuery->intervalTime == 0) { if (pQuery->intervalTime == 0) {
int16_t type = TSDB_DATA_TYPE_NULL; int16_t type = TSDB_DATA_TYPE_NULL;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else { } else {
type = TSDB_DATA_TYPE_INT; // group id type = TSDB_DATA_TYPE_INT; // group id
...@@ -4134,7 +4174,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4134,7 +4174,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
} }
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { } else if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
int32_t rows = getInitialPageNum(pQInfo); int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4142,7 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4142,7 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
} }
int16_t type = TSDB_DATA_TYPE_NULL; int16_t type = TSDB_DATA_TYPE_NULL;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (pRuntimeEnv->groupbyNormalCol) {
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else { } else {
type = TSDB_DATA_TYPE_TIMESTAMP; type = TSDB_DATA_TYPE_TIMESTAMP;
...@@ -4160,8 +4200,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4160,8 +4200,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
// todo refactor // todo refactor
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4202,14 +4243,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4202,14 +4243,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (!pRuntimeEnv->groupbyNormalCol) {
if (!isIntervalQuery(pQuery)) { if (!isIntervalQuery(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step); setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
} else { // interval query } else { // interval query
TSKEY nextKey = blockInfo.window.skey; TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey); setIntervalQueryRange(pQInfo, nextKey);
/*int32_t ret = */setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
}
} }
} }
...@@ -4234,9 +4278,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4234,9 +4278,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb); setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
STableId id = tsdbGetTableId(pCheckInfo->pTable); STableId* id = TSDB_TABLEID(pCheckInfo->pTable);
qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
id.uid, id.tid, pCheckInfo->lastKey, pCheckInfo->win.ekey); id->uid, id->tid, pCheckInfo->lastKey, pCheckInfo->win.ekey);
STsdbQueryCond cond = { STsdbQueryCond cond = {
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
...@@ -4365,7 +4409,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4365,7 +4409,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break; break;
} }
} }
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
...@@ -4503,11 +4547,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4503,11 +4547,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
*/ */
pQInfo->tableIndex++; pQInfo->tableIndex++;
STableIdInfo tidInfo; STableIdInfo tidInfo = {0};
STableId id = tsdbGetTableId(pQuery->current->pTable);
tidInfo.uid = id.uid; STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.tid = id.tid; tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey; tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
...@@ -4609,6 +4653,8 @@ static void doRestoreContext(SQInfo *pQInfo) { ...@@ -4609,6 +4653,8 @@ static void doRestoreContext(SQInfo *pQInfo) {
static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroup; ++i) { for (int32_t i = 0; i < numOfGroup; ++i) {
...@@ -4618,6 +4664,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { ...@@ -4618,6 +4664,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
for (int32_t j = 0; j < num; ++j) { for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(group, j); STableQueryInfo* item = taosArrayGetP(group, j);
closeAllTimeWindow(&item->windowResInfo); closeAllTimeWindow(&item->windowResInfo);
removeRedundantWindow(&item->windowResInfo, item->lastKey - step, step);
} }
} }
} else { // close results for group result } else { // close results for group result
...@@ -4669,6 +4716,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4669,6 +4716,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
el = scanMultiTableDataBlocks(pQInfo); el = scanMultiTableDataBlocks(pQInfo);
qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el); qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el);
doCloseAllTimeWindowAfterScan(pQInfo);
doRestoreContext(pQInfo); doRestoreContext(pQInfo);
} else { } else {
qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo); qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo);
...@@ -4681,7 +4729,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4681,7 +4729,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
return; return;
} }
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery); copyResToQueryResultBuf(pQInfo, pQuery);
...@@ -4778,10 +4826,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -4778,10 +4826,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo; STableIdInfo tidInfo;
STableId id = tsdbGetTableId(pQuery->current); STableId* id = TSDB_TABLEID(pQuery->current);
tidInfo.uid = id.uid; tidInfo.uid = id->uid;
tidInfo.tid = id.tid; tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey; tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
} }
...@@ -4871,7 +4919,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4871,7 +4919,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
} }
// all data scanned, the group by normal column can return // all data scanned, the group by normal column can return
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
pQInfo->groupIndex = 0; pQInfo->groupIndex = 0;
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
...@@ -4932,7 +4980,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4932,7 +4980,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
STableQueryInfo* item = taosArrayGetP(g, 0); STableQueryInfo* item = taosArrayGetP(g, 0);
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation)
tableIntervalProcess(pQInfo, item); tableIntervalProcess(pQInfo, item);
} else if (isFixedOutputQuery(pQuery)) { } else if (isFixedOutputQuery(pQuery)) {
tableFixedOutputProcess(pQInfo, item); tableFixedOutputProcess(pQInfo, item);
...@@ -4947,18 +4995,19 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4947,18 +4995,19 @@ static void tableQueryImpl(SQInfo *pQInfo) {
} }
static void stableQueryImpl(SQInfo *pQInfo) { static void stableQueryImpl(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (isIntervalQuery(pQuery) || if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol &&
!isFirstLastRowQuery(pQuery))) { !isFirstLastRowQuery(pQuery))) {
multiTableQueryProcess(pQInfo); multiTableQueryProcess(pQInfo);
} else { } else {
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
isFirstLastRowQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
sequentialTableProcess(pQInfo); sequentialTableProcess(pQInfo);
} }
...@@ -5653,28 +5702,33 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5653,28 +5702,33 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STimeWindow window = pQueryMsg->window; STimeWindow window = pQueryMsg->window;
taosArraySort(pTableIdList, compareTableIdInfo); taosArraySort(pTableIdList, compareTableIdInfo);
// TODO optimize the STableQueryInfo malloc strategy
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
size_t s = taosArrayGetSize(pa);
size_t s = taosArrayGetSize(pa);
SArray* p1 = taosArrayInit(s, POINTER_BYTES); SArray* p1 = taosArrayInit(s, POINTER_BYTES);
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
void* pTable = taosArrayGetP(pa, j); void* pTable = taosArrayGetP(pa, j);
STableId* id = TSDB_TABLEID(pTable);
// NOTE: compare STableIdInfo with STableId STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
STableId id = tsdbGetTableId(pTable);
STableIdInfo* pTableId = taosArraySearch(pTableIdList, &id, compareTableIdInfo);
if (pTableId != NULL ) { if (pTableId != NULL ) {
window.skey = pTableId->key; window.skey = pTableId->key;
} else { } else {
window.skey = pQueryMsg->window.skey; window.skey = pQueryMsg->window.skey;
} }
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window); void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window, buf);
item->groupIndex = i; item->groupIndex = i;
taosArrayPush(p1, &item); taosArrayPush(p1, &item);
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES); taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
index += 1;
} }
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
...@@ -5818,6 +5872,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5818,6 +5872,7 @@ static void freeQInfo(SQInfo *pQInfo) {
taosArrayDestroy(p); taosArrayDestroy(p);
} }
tfree(pQInfo->pBuf);
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList); taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map); taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
tsdbDestoryTableGroup(&pQInfo->tableGroupInfo); tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
...@@ -6094,7 +6149,8 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -6094,7 +6149,8 @@ void qTableQuery(qinfo_t qinfo) {
return; return;
} }
int32_t ret = setjmp(pQInfo->env); int32_t ret = setjmp(pQInfo->runtimeEnv.env);
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret; pQInfo->code = ret;
...@@ -6277,13 +6333,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6277,13 +6333,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output); output = varDataVal(output);
STableId id = tsdbGetTableId(item->pTable); STableId* id = TSDB_TABLEID(item->pTable);
*(int64_t *)output = id.uid; // memory align problem, todo serialize *(int64_t *)output = id->uid; // memory align problem, todo serialize
output += sizeof(id.uid); output += sizeof(id->uid);
*(int32_t *)output = id.tid; *(int32_t *)output = id->tid;
output += sizeof(id.tid); output += sizeof(id->tid);
*(int32_t *)output = pQInfo->vgId; *(int32_t *)output = pQInfo->vgId;
output += sizeof(pQInfo->vgId); output += sizeof(pQInfo->vgId);
......
...@@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun ...@@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->threshold = threshold; pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type; pWindowResInfo->type = type;
_hash_fn_t fn = taosGetDefaultHashFunction(type); _hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, false); pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
...@@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { ...@@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return; return;
} }
// TODO opt malloc strategy
for (int32_t i = 0; i < nOutputCols; ++i) { for (int32_t i = 0; i < nOutputCols; ++i) {
free(pWindowRes->resultInfo[i].interResultBuf); free(pWindowRes->resultInfo[i].interResultBuf);
} }
...@@ -180,19 +180,34 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -180,19 +180,34 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
/* /*
* remove the results that are not the FIRST time window that spreads beyond the * remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/ */
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) { void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
if (pWindowResInfo->size <= 1) {
return;
}
// get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)?
TSDB_ORDER_ASC:TSDB_ORDER_DESC;
if (order != resultOrder) {
return;
}
int32_t i = 0; int32_t i = 0;
while (i < pWindowResInfo->size && if (order == QUERY_ASC_FORWARD_STEP) {
((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) || while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.ekey < lastKey)) {
(pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) { ++i;
++i; }
} else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.skey > lastKey)) {
++i;
}
} }
// assert(i < pWindowResInfo->size);
if (i < pWindowResInfo->size) { if (i < pWindowResInfo->size) {
pWindowResInfo->size = (i + 1); pWindowResInfo->size = (i + 1);
} }
......
...@@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) { ...@@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
* To flush data to disk to accommodate more data * To flush data to disk to accommodate more data
*/ */
if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) { if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) {
if (!tExtMemBufferFlush(pMemBuffer)) { if (tExtMemBufferFlush(pMemBuffer) != 0) {
return false; return false;
} }
} }
...@@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) { ...@@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file); size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file);
if (retVal <= 0) { // failed to write to buffer, may be not enough space if (retVal <= 0) { // failed to write to buffer, may be not enough space
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
return ret;
} }
pMemBuffer->fileMeta.numOfElemsInFile += first->item.num; pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;
......
...@@ -22,41 +22,6 @@ ...@@ -22,41 +22,6 @@
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
return startTime;
}
if (timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h') {
return (startTime / slidingTime) * slidingTime;
} else {
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*
* TODO dynamically decide the start time of a day, move to common module
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
#endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
int64_t revStartime = (startTime / slidingTime) * slidingTime + timezone * t;
int64_t revEndtime = revStartime + slidingTime - 1;
if (revEndtime < startTime) {
revStartime += slidingTime;
}
return revStartime;
}
}
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) { int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
...@@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva ...@@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
return ekey; return ekey;
} else { } else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); return taosGetIntervalStartTimestamp(ekey, timeInterval, timeInterval, slidingTimeUnit, precision);
} }
} }
......
...@@ -47,9 +47,9 @@ extern int tsdbDebugFlag; ...@@ -47,9 +47,9 @@ extern int tsdbDebugFlag;
// Definitions // Definitions
// ------------------ tsdbMeta.c // ------------------ tsdbMeta.c
typedef struct STable { typedef struct STable {
STableId tableId;
ETableType type; ETableType type;
tstr* name; // NOTE: there a flexible string here tstr* name; // NOTE: there a flexible string here
STableId tableId;
uint64_t suid; uint64_t suid;
struct STable* pSuper; // super table pointer struct STable* pSuper; // super table pointer
uint8_t numOfSchemas; uint8_t numOfSchemas;
...@@ -294,14 +294,36 @@ typedef struct { ...@@ -294,14 +294,36 @@ typedef struct {
#define TABLE_SUID(t) (t)->suid #define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey #define TABLE_LASTKEY(t) (t)->lastKey
static FORCE_INLINE STSchema *tsdbGetTableSchema(STable *pTable) {
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
STable *pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->schema[pSuper->numOfSchemas - 1];
} else if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
return pTable->schema[pTable->numOfSchemas - 1];
} else {
return NULL;
}
}
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
STable *pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->tagSchema;
} else if (pTable->type == TSDB_SUPER_TABLE) {
return pTable->tagSchema;
} else {
return NULL;
}
}
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta); void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo); int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo); int tsdbCloseMeta(STsdbRepo* pRepo);
STSchema* tsdbGetTableSchema(STable* pTable);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
STSchema* tsdbGetTableTagSchema(STable* pTable);
int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg); int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg);
int tsdbWLockRepoMeta(STsdbRepo* pRepo); int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo); int tsdbRLockRepoMeta(STsdbRepo* pRepo);
......
...@@ -274,6 +274,8 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -274,6 +274,8 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
pMsg->tid = htonl(pMsg->tid); pMsg->tid = htonl(pMsg->tid);
pMsg->tversion = htons(pMsg->tversion); pMsg->tversion = htons(pMsg->tversion);
pMsg->colId = htons(pMsg->colId); pMsg->colId = htons(pMsg->colId);
pMsg->type = htons(pMsg->type);
pMsg->bytes = htons(pMsg->bytes);
pMsg->tagValLen = htonl(pMsg->tagValLen); pMsg->tagValLen = htonl(pMsg->tagValLen);
pMsg->numOfTags = htons(pMsg->numOfTags); pMsg->numOfTags = htons(pMsg->numOfTags);
pMsg->schemaLen = htonl(pMsg->schemaLen); pMsg->schemaLen = htonl(pMsg->schemaLen);
...@@ -449,18 +451,6 @@ int tsdbCloseMeta(STsdbRepo *pRepo) { ...@@ -449,18 +451,6 @@ int tsdbCloseMeta(STsdbRepo *pRepo) {
return 0; return 0;
} }
STSchema *tsdbGetTableSchema(STable *pTable) {
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
return pTable->schema[pTable->numOfSchemas - 1];
} else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->schema[pSuper->numOfSchemas - 1];
} else {
return NULL;
}
}
STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid)); void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid));
...@@ -480,18 +470,6 @@ STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) { ...@@ -480,18 +470,6 @@ STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) {
return *(STSchema **)ptr; return *(STSchema **)ptr;
} }
STSchema *tsdbGetTableTagSchema(STable *pTable) {
if (pTable->type == TSDB_SUPER_TABLE) {
return pTable->tagSchema;
} else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->tagSchema;
} else {
return NULL;
}
}
int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) { int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) {
// TODO: this function can only be called when there is no query and commit on this table // TODO: this function can only be called when there is no query and commit on this table
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE); ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
......
...@@ -110,13 +110,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -110,13 +110,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken // current connect is broken
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
// add lock here
handle = qRegisterQInfo(pVnode->qMgmt, pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL) { // failed to register qhandle if (handle == NULL) { // failed to register qhandle
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
qKillQuery(pQInfo); qKillQuery(pQInfo);
qKillQuery(pQInfo); qKillQuery(pQInfo);
pQInfo = NULL;
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle)); pRsp->qhandle = htobe64((uint64_t) (handle));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册