未验证 提交 659e0d21 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4425 from taosdata/feature/query

Feature/query
...@@ -246,11 +246,14 @@ typedef struct SQueryInfo { ...@@ -246,11 +246,14 @@ typedef struct SQueryInfo {
int16_t fillType; // final result fill type int16_t fillType; // final result fill type
int16_t numOfTables; int16_t numOfTables;
STableMetaInfo **pTableMetaInfo; STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf; struct STSBuf *tsBuf;
int64_t * fillVal; // default value for fill int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t tableLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id int16_t resColumnId; // result column id
} SQueryInfo; } SQueryInfo;
......
...@@ -426,8 +426,7 @@ static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -426,8 +426,7 @@ static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) {
} }
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
*((int64_t *)pCtx->aOutputBuf) += pCtx->size;
*((int64_t *)pCtx->aOutputBuf) += 1;
// do not need it actually // do not need it actually
SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx);
...@@ -3632,114 +3631,119 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) { ...@@ -3632,114 +3631,119 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) {
return true; return true;
} }
static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) { static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t index, int32_t size) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
TSKEY *primaryKey = pCtx->ptsList; TSKEY *primaryKey = pCtx->ptsList;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t i = index; int32_t i = index;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
if (pCtx->start.key != INT64_MIN) { if (pCtx->start.key != INT64_MIN) {
assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN); assert((pCtx->start.key < primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_ASC) ||
(pCtx->start.key > primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_DESC));
pInfo->lastKey = primaryKey[index]; assert(pInfo->lastKey == INT64_MIN);
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
pInfo->lastKey = primaryKey[tsIndex + i];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key); pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key);
pInfo->hasResult = DATA_SET_FLAG; pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pCtx->start.key; pInfo->win.skey = pCtx->start.key;
notNullElems++; notNullElems++;
i += 1; i += step;
} else if (pInfo->lastKey == INT64_MIN) { } else if (pInfo->lastKey == INT64_MIN) {
pInfo->lastKey = primaryKey[index]; pInfo->lastKey = primaryKey[tsIndex + i];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
pInfo->hasResult = DATA_SET_FLAG; pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pInfo->lastKey; pInfo->win.skey = pInfo->lastKey;
notNullElems++; notNullElems++;
i += 1; i += step;
} }
// calculate the value of // calculate the value of
switch(pCtx->inputType) { switch(pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index); int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
for (; i < size; i++) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue; continue;
} }
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
pInfo->lastValue = val[i]; pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i]; pInfo->lastKey = primaryKey[i + tsIndex];
} }
break; break;
} }
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index); int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
for (; i < size; i++) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue; continue;
} }
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
pInfo->lastValue = val[i]; pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i]; pInfo->lastKey = primaryKey[i + tsIndex];
} }
break; break;
} }
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index); int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
for (; i < size; i++) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue; continue;
} }
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
pInfo->lastValue = val[i]; pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i]; pInfo->lastKey = primaryKey[i + tsIndex];
} }
break; break;
} }
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index); int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
for (; i < size; i++) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue; continue;
} }
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
pInfo->lastValue = (double) val[i]; pInfo->lastValue = (double) val[i];
pInfo->lastKey = primaryKey[i]; pInfo->lastKey = primaryKey[i + tsIndex];
} }
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index); float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, 0);
for (; i < size; i++) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue; continue;
} }
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
pInfo->lastValue = val[i]; pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i]; pInfo->lastKey = primaryKey[i + tsIndex];
} }
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index); double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, 0);
for (; i < size; i++) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue; continue;
} }
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
pInfo->lastValue = val[i]; pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i]; pInfo->lastKey = primaryKey[i + tsIndex];
} }
break; break;
} }
...@@ -3764,16 +3768,13 @@ static void twa_function(SQLFunctionCtx *pCtx) { ...@@ -3764,16 +3768,13 @@ static void twa_function(SQLFunctionCtx *pCtx) {
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
// skip null value // skip null value
int32_t i = 0; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC)? 0:(pCtx->size - 1);
while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) { while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
i++; i += step;
} }
if (i >= pCtx->size) { int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, i, pCtx->size);
return;
}
int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size);
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { if (notNullElems > 0) {
...@@ -3791,11 +3792,136 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3791,11 +3792,136 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
return; return;
} }
int32_t notNullElems = twa_function_impl(pCtx, index, 1); int32_t notNullElems = 0;
TSKEY *primaryKey = pCtx->ptsList;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t i = pCtx->startOffset;
int32_t size = pCtx->size;
if (pCtx->start.key != INT64_MIN) {
assert(pInfo->lastKey == INT64_MIN);
pInfo->lastKey = primaryKey[index];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key);
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pCtx->start.key;
notNullElems++;
i += 1;
} else if (pInfo->lastKey == INT64_MIN) {
pInfo->lastKey = primaryKey[index];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pInfo->lastKey;
notNullElems++;
i += 1;
}
// calculate the value of
switch(pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i + index];
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i + index];
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i + index];
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
pInfo->lastValue = (double) val[i];
pInfo->lastKey = primaryKey[i + index];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i + index];
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i + index];
}
break;
}
default: assert(0);
}
// the last interpolated time window value
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey);
pInfo->lastValue = pCtx->end.val;
pInfo->lastKey = pCtx->end.key;
}
pInfo->win.ekey = pInfo->lastKey;
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
if (pCtx->stableQuery) { if (pCtx->stableQuery) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo)); memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo));
} }
} }
......
...@@ -721,10 +721,16 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -721,10 +721,16 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
// final result depends on the fields number // final result depends on the fields number
memset(pSchema, 0, sizeof(SSchema) * size); memset(pSchema, 0, sizeof(SSchema) * size);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema *p1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex); SSchema p1 = {0};
if (pExpr->colInfo.colIndex != TSDB_TBNAME_COLUMN_INDEX) {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
} else {
p1 = tGetTableNameColumnSchema();
}
int32_t inter = 0; int32_t inter = 0;
int16_t type = -1; int16_t type = -1;
...@@ -743,7 +749,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -743,7 +749,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
functionId = TSDB_FUNC_LAST; functionId = TSDB_FUNC_LAST;
} }
getResultDataInfo(p1->type, p1->bytes, functionId, 0, &type, &bytes, &inter, 0, false); int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false);
assert(ret == TSDB_CODE_SUCCESS);
} }
pSchema[i].type = (uint8_t)type; pSchema[i].type = (uint8_t)type;
......
...@@ -5308,14 +5308,17 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5308,14 +5308,17 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// keep original limitation value in globalLimit // keep original limitation value in globalLimit
pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->prjOffset = pQueryInfo->limit.offset; pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->tableLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/* /*
* the limitation/offset value should be removed during retrieve data from virtual node, * the offset value should be removed during retrieve data from virtual node, since the
* since the global order are done in client side, so the limitation should also * global order are done in client side, so the offset is applied at the client side
* be done at the client side. * However, note that the maximum allowed number of result for each table should be less
* than or equal to the value of limit.
*/ */
if (pQueryInfo->limit.limit > 0) { if (pQueryInfo->limit.limit > 0) {
pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
} }
......
...@@ -684,6 +684,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -684,6 +684,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
......
...@@ -476,6 +476,8 @@ typedef struct { ...@@ -476,6 +476,8 @@ typedef struct {
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
int64_t tableLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query.
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
uint32_t queryType; // denote another query process uint32_t queryType; // denote another query process
......
...@@ -83,16 +83,15 @@ typedef struct SResultRec { ...@@ -83,16 +83,15 @@ typedef struct SResultRec {
int32_t threshold; // result size threshold in rows. int32_t threshold; // result size threshold in rows.
} SResultRec; } SResultRec;
typedef struct SWindowResInfo { typedef struct SResultRowInfo {
SResultRow** pResult; // result list SResultRow** pResult; // result list
int16_t type:8; // data type for hash key int16_t type:8; // data type for hash key
int32_t size:24; // number of result set int32_t size:24; // number of result set
int32_t threshold; // threshold to halt query and return the generated results.
int32_t capacity; // max capacity int32_t capacity; // max capacity
int32_t curIndex; // current start active index int32_t curIndex; // current start active index
int64_t startTime; // start time of the first time window for sliding query int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key int64_t prevSKey; // previous (not completed) sliding window start key
} SWindowResInfo; } SResultRowInfo;
typedef struct SColumnFilterElem { typedef struct SColumnFilterElem {
int16_t bytes; // column length int16_t bytes; // column length
...@@ -115,7 +114,7 @@ typedef struct STableQueryInfo { ...@@ -115,7 +114,7 @@ typedef struct STableQueryInfo {
STimeWindow win; STimeWindow win;
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
SWindowResInfo windowResInfo; SResultRowInfo windowResInfo;
} STableQueryInfo; } STableQueryInfo;
typedef struct SQueryCostInfo { typedef struct SQueryCostInfo {
...@@ -179,7 +178,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -179,7 +178,7 @@ typedef struct SQueryRuntimeEnv {
uint16_t* offset; uint16_t* offset;
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
SWindowResInfo windowResInfo; SResultRowInfo windowResInfo;
STSBuf* pTSBuf; STSBuf* pTSBuf;
STSCursor cur; STSCursor cur;
SQueryCostInfo summary; SQueryCostInfo summary;
...@@ -190,6 +189,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -190,6 +189,7 @@ typedef struct SQueryRuntimeEnv {
bool groupbyNormalCol; // denote if this is a groupby normal column query bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
...@@ -217,7 +217,8 @@ typedef struct SQInfo { ...@@ -217,7 +217,8 @@ typedef struct SQInfo {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
SArray* arrTableIdInfo; // SArray* arrTableIdInfo;
SHashObj* arrTableIdInfo;
int32_t groupIndex; int32_t groupIndex;
/* /*
......
...@@ -30,19 +30,19 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t typ ...@@ -30,19 +30,19 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t typ
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int32_t threshold, int16_t type); int32_t initWindowResInfo(SResultRowInfo* pWindowResInfo, int32_t size, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo); void cleanupTimeWindowInfo(SResultRowInfo* pWindowResInfo);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pWindowResInfo);
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); int32_t numOfClosedTimeWindow(SResultRowInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeTimeWindow(SResultRowInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void closeAllTimeWindow(SResultRowInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]; return pWindowResInfo->pResult[slot];
} }
...@@ -50,7 +50,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int ...@@ -50,7 +50,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot);
int32_t initResultRow(SResultRow *pResultRow); int32_t initResultRow(SResultRow *pResultRow);
......
...@@ -196,6 +196,9 @@ static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* ...@@ -196,6 +196,9 @@ static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
static int32_t checkForQueryBuf(size_t numOfTables); static int32_t checkForQueryBuf(size_t numOfTables);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type);
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery);
static STableIdInfo createTableIdInfo(SQuery* pQuery);
bool doFilterData(SQuery *pQuery, int32_t elemPos) { bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
...@@ -458,7 +461,7 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis ...@@ -458,7 +461,7 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
return true; return true;
} }
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t uid) {
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
int32_t *p1 = int32_t *p1 =
...@@ -515,7 +518,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin ...@@ -515,7 +518,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) {
STimeWindow w = {0}; STimeWindow w = {0};
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
...@@ -608,7 +611,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf ...@@ -608,7 +611,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0; return 0;
} }
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo,
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
...@@ -641,7 +644,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes ...@@ -641,7 +644,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool getResultRowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { static bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size); assert(slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]->closed; return pWindowResInfo->pResult[slot]->closed;
} }
...@@ -660,7 +663,7 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { ...@@ -660,7 +663,7 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
} }
} }
static bool isResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { static bool resultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) { if (type == RESULT_ROW_START_INTERP) {
return pResult->startInterp == true; return pResult->startInterp == true;
...@@ -700,7 +703,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se ...@@ -700,7 +703,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
/** /**
* NOTE: the query status only set for the first scan of master scan. * NOTE: the query status only set for the first scan of master scan.
*/ */
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) { static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN) { if (pRuntimeEnv->scanFlag != MASTER_SCAN) {
return pWindowResInfo->size; return pWindowResInfo->size;
...@@ -758,7 +761,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe ...@@ -758,7 +761,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
// the number of completed slots are larger than the threshold, return current generated results to client. // the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pWindowResInfo->threshold) { if (numOfClosed > pQuery->rec.threshold) {
qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return", qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return",
GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold); GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold);
...@@ -989,7 +992,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas ...@@ -989,7 +992,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
if (functionId == TSDB_FUNC_ARITHM) { if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pExpr1[col]; sas->pArithExpr = &pQuery->pExpr1[col];
sas->offset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1); sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1);
sas->colList = pQuery->colList; sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols; sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
...@@ -1032,85 +1035,89 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas ...@@ -1032,85 +1035,89 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
return dataBlock; return dataBlock;
} }
// window start key interpolation static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { if (type == RESULT_ROW_START_INTERP) {
SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].start.key = INT64_MIN;
TSKEY start = tsCols[pos];
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
TSKEY prevTs = (pos == 0)? lastTs : tsCols[pos - 1];
// if lastTs == INT64_MIN, it is the first block, no need to do the start time interpolation
if (((lastTs != INT64_MIN && pos >= 0) || (lastTs == INT64_MIN && pos > 0)) && win->skey > lastTs &&
win->skey < start) {
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
if (k == 0 && pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
continue;
} }
} else {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].end.key = INT64_MIN;
}
}
}
double v1 = 0, v2 = 0, v = 0; //static double getTSWindowInterpoVal(SColumnInfoData* pColInfo, int16_t srcColIndex, int16_t rowIndex, TSKEY key, char** prevRow, TSKEY* tsCols, int32_t step) {
// TSKEY start = tsCols[rowIndex];
// TSKEY prevTs = (rowIndex == 0)? *(TSKEY *) prevRow[0] : tsCols[rowIndex - step];
//
// double v1 = 0, v2 = 0, v = 0;
// char *prevVal = (rowIndex == 0)? prevRow[srcColIndex] : ((char*)pColInfo->pData) + (rowIndex - step) * pColInfo->info.bytes;
//
// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal);
// GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + rowIndex * pColInfo->info.bytes);
//
// SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
// SPoint point2 = (SPoint){.key = start, .val = &v2};
// SPoint point = (SPoint){.key = key, .val = &v};
// taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
//
// return v;
//}
char *prevVal = pos == 0 ? pRuntimeEnv->prevRow[k] : ((char*)pColInfo->pData) + (pos - 1) * pColInfo->info.bytes; // window start key interpolation
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal); TSKEY curTs = tsCols[pos];
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
SPoint point2 = (SPoint){.key = start, .val = &v2}; // start exactly from this point, no need to do interpolation
SPoint point = (SPoint){.key = win->skey, .val = &v}; TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey;
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); if (key == curTs) {
pRuntimeEnv->pCtx[k].start.key = point.key; setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
pRuntimeEnv->pCtx[k].start.val = v; return true;
} }
if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
return true; return true;
} else {
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
} }
return false; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
} TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))?
lastTs:tsCols[pos - step];
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP);
return true;
} }
static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, TSKEY ekey, STimeWindow* win) { static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY trueEndKey = tsCols[pos]; TSKEY actualEndKey = tsCols[endRowIndex];
if (win->ekey < ekey && win->ekey != trueEndKey) {
int32_t nextIndex = pos + 1;
TSKEY next = tsCols[nextIndex];
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
if (k == 0 && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
continue;
}
double v1 = 0, v2 = 0, v = 0; TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey;
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes);
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + nextIndex * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = trueEndKey, .val = &v1}; // not ended in current data block, do not invoke interpolation
SPoint point2 = (SPoint){.key = next, .val = &v2}; if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQuery)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQuery))) {
SPoint point = (SPoint){.key = win->ekey, .val = &v}; setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); return false;
pRuntimeEnv->pCtx[k].end.key = point.key;
pRuntimeEnv->pCtx[k].end.val = v;
} }
// there is actual end point of current time window, no interpolation need
if (key == actualEndKey) {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
return true; return true;
} else { // current time window does not ended in current data block, do nothing
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
pRuntimeEnv->pCtx[k].end.key = INT64_MIN;
} }
return false; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
} int32_t nextRowIndex = endRowIndex + step;
assert(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex];
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP);
return true;
} }
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) { static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) {
...@@ -1119,10 +1126,10 @@ static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* ...@@ -1119,10 +1126,10 @@ static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo*
} }
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0;
for (int32_t k = 0; k < pQuery->numOfCols; ++k) { for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * (pDataBlockInfo->rows - 1)), memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes);
pColInfo->info.bytes);
} }
} }
...@@ -1150,7 +1157,7 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY ...@@ -1150,7 +1157,7 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY
* such as count/min/max etc. * such as count/min/max etc.
*/ */
static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SResultRowInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
...@@ -1174,11 +1181,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1174,11 +1181,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t prevIndex = curTimeWindowIndex(pWindowResInfo);
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false; bool hasTimeWindow = false;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tfree(sasArray); tfree(sasArray);
...@@ -1193,20 +1202,47 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1193,20 +1202,47 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY ekey = reviseWindowEkey(pQuery, &win); TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex) {
for(int32_t j = prevIndex; j < curIndex; ++j) {
SResultRow *pRes = pWindowResInfo->pResult[j];
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &w, startPos, 0, tsCols, pDataBlockInfo->rows);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
assert (ret == TSDB_CODE_SUCCESS); // null data, too many state code
}
// window start key interpolation // window start key interpolation
if (pRuntimeEnv->timeWindowInterpo) { if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) { if (!done) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pQuery->pos, pDataBlock, tsCols, &win); int32_t startRowIndex = pQuery->pos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} }
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) { if (!done) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, int32_t endRowIndex = pQuery->pos + (forwardStep - 1) * step;
pDataBlockInfo->window.ekey, &win);
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
...@@ -1243,17 +1279,20 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1243,17 +1279,20 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// window start(end) key interpolation // window start(end) key interpolation
if (pRuntimeEnv->timeWindowInterpo) { if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) { if (!done) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &nextWin);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} }
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) { if (!done) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); int32_t endRowIndex = startPos + (forwardStep - 1)*step;
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &nextWin);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
...@@ -1459,8 +1498,84 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1459,8 +1498,84 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return true; return true;
} }
void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionId != TSDB_FUNC_TWA) {
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
continue;
}
SColIndex* pColIndex = &pQuery->pExpr1[k].base.colInfo;
int16_t index = pColIndex->colIndex;
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, index);
assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey);
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[k]);
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v};
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
if (type == RESULT_ROW_START_INTERP) {
pRuntimeEnv->pCtx[k].start.key = point.key;
pRuntimeEnv->pCtx[k].start.val = v;
} else {
pRuntimeEnv->pCtx[k].end.key = point.key;
pRuntimeEnv->pCtx[k].end.val = v;
}
}
}
static void setTimeWindowSKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey;
if (key == ts) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} else if (prevTs != INT64_MIN && ((QUERY_IS_ASC_QUERY(pQuery) && prevTs < key) || (!QUERY_IS_ASC_QUERY(pQuery) && prevTs > key))) {
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_START_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pRuntimeEnv->pCtx[k].size = 1;
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
}
static void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pRuntimeEnv->pCtx[i].size = 0;
}
}
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SResultRowInfo *pWindowResInfo, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
...@@ -1489,6 +1604,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1489,6 +1604,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
pCtx[k].size = 1;
} }
// set the input column data // set the input column data
...@@ -1508,7 +1624,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1508,7 +1624,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
} }
int32_t offset = -1; int32_t offset = -1;
// TSKEY prev = -1; TSKEY prevTs = *(TSKEY*) pRuntimeEnv->prevRow[0];
int32_t prevRowIndex = -1;
for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) { for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) {
offset = GET_COL_DATA_POS(pQuery, j, step); offset = GET_COL_DATA_POS(pQuery, j, step);
...@@ -1530,7 +1647,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1530,7 +1647,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// interval window query, decide the time window according to the primary timestamp // interval window query, decide the time window according to the primary timestamp
if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo);
int64_t ts = tsCols[offset]; int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false; bool hasTimeWindow = false;
...@@ -1543,27 +1662,35 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1543,27 +1662,35 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if (!hasTimeWindow) { if (!hasTimeWindow) {
continue; continue;
} }
/*
// window start key interpolation // window start key interpolation
if (pRuntimeEnv->timeWindowInterpo) { if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); // check for the time window end time interpolation
if (!alreadyInterp) { int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pos, pDataBlock, tsCols, &win); if (prevWindowIndex != -1 && prevWindowIndex < curIndex) {
if (interp) { for (int32_t k = prevWindowIndex; k < curIndex; ++k) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); SResultRow *pRes = pWindowResInfo->pResult[k];
}
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &pRes->win, masterScan, &hasTimeWindow, &pResult);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win);
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &pRes->win, offset);
} }
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); // restore current time window
if (!alreadyInterp) { ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow,
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, &pResult);
pDataBlockInfo->window.ekey, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
if (interp) { continue;
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
} }
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win);
} }
*/
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset); doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset);
...@@ -1588,26 +1715,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1588,26 +1715,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
} }
if (hasTimeWindow) { if (hasTimeWindow) {
/* setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin);
// window start(end) key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
}
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
}
}
*/
closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset); doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset);
} }
...@@ -1633,7 +1741,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1633,7 +1741,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
} }
} }
// prev = tsCols[offset]; prevTs = tsCols[offset];
prevRowIndex = offset;
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
// if timestamp filter list is empty, quit current query // if timestamp filter list is empty, quit current query
...@@ -1672,7 +1781,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl ...@@ -1672,7 +1781,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQInfo = pQuery->current; STableQueryInfo* pTableQInfo = pQuery->current;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) { if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
...@@ -2496,7 +2605,7 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { ...@@ -2496,7 +2605,7 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
return false; return false;
} }
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
*status = BLK_DATA_NO_NEEDED; *status = BLK_DATA_NO_NEEDED;
...@@ -2674,6 +2783,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa ...@@ -2674,6 +2783,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
pQuery->rec.capacity = capacity; pQuery->rec.capacity = capacity;
} }
// TODO merge with enuserOutputBufferSimple
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -2718,7 +2828,7 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo ...@@ -2718,7 +2828,7 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w); getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
...@@ -3082,14 +3192,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) ...@@ -3082,14 +3192,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
return -1; return -1;
} }
SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; SResultRowInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
TSKEY leftTimestamp = GET_INT64_VAL(b1); TSKEY leftTimestamp = GET_INT64_VAL(b1);
SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; SResultRowInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
...@@ -3329,7 +3439,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3329,7 +3439,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int32_t pos = pTree->pNode[0].index; int32_t pos = pTree->pNode[0].index;
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SResultRowInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]); SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
...@@ -3477,17 +3587,9 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * ...@@ -3477,17 +3587,9 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
// order has changed already // order has changed already
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// TODO validate the assertion
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
// } else {
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
// }
if (pTableQueryInfo->lastKey == pTableQueryInfo->win.skey) { if (pTableQueryInfo->lastKey == pTableQueryInfo->win.skey) {
// do nothing, no results // do nothing, no results
} else { } else {// NOTE: even win.skey != lastKey, the results may not generated.
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step; pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
} }
...@@ -3501,7 +3603,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * ...@@ -3501,7 +3603,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1; pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1;
} }
static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t order) { static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
...@@ -3533,7 +3635,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { ...@@ -3533,7 +3635,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
int32_t order = pQuery->order.order; int32_t order = pQuery->order.order;
// group by normal columns and interval query on normal table // group by normal columns and interval query on normal table
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order); disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
} else { // for simple result of table query, } else { // for simple result of table query,
...@@ -3724,7 +3826,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3724,7 +3826,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
bool toContinue = false; bool toContinue = false;
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = getResultRow(pWindowResInfo, i); SResultRow *pResult = getResultRow(pWindowResInfo, i);
...@@ -3809,13 +3911,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI ...@@ -3809,13 +3911,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
SET_REVERSE_SCAN_FLAG(pRuntimeEnv); SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv); switchCtxOrder(pRuntimeEnv);
...@@ -3884,6 +3980,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { ...@@ -3884,6 +3980,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
// do nothing if no data blocks are found qualified during scan // do nothing if no data blocks are found qualified during scan
if (qstatus.lastKey != pTableQueryInfo->lastKey) { if (qstatus.lastKey != pTableQueryInfo->lastKey) {
qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step; qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step;
} else { // the lastkey does not increase, which means no data checked yet
qDebug("QInfo:%p no results generated in this scan", pQInfo);
} }
qstatus.lastKey = pTableQueryInfo->lastKey; qstatus.lastKey = pTableQueryInfo->lastKey;
...@@ -3898,18 +3996,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { ...@@ -3898,18 +3996,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
break; break;
} }
STsdbQueryCond cond = {
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, qstatus.curWindow);
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
} }
STsdbQueryCond cond = createTsdbQueryCond(pQuery);
restoreTimeWindow(&pQInfo->tableGroupInfo, &cond); restoreTimeWindow(&pQInfo->tableGroupInfo, &cond);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) { if (pRuntimeEnv->pSecQueryHandle == NULL) {
...@@ -3947,7 +4038,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3947,7 +4038,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol) { if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo); closeAllTimeWindow(pWindowResInfo);
} }
...@@ -4003,9 +4094,8 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void ...@@ -4003,9 +4094,8 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
// set more initial size of interval/groupby query // set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
int32_t initialSize = 16; int32_t initialSize = 128;
int32_t initialThreshold = 100; int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT);
int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -4032,7 +4122,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { ...@@ -4032,7 +4122,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
// lastKey needs to be updated // lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey; pTableQueryInfo->lastKey = nextKey;
...@@ -4200,7 +4290,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -4200,7 +4290,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
* operations involve. * operations involve.
*/ */
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo; SResultRowInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo;
TSKEY sk = MIN(win.skey, win.ekey); TSKEY sk = MIN(win.skey, win.ekey);
TSKEY ek = MAX(win.skey, win.ekey); TSKEY ek = MAX(win.skey, win.ekey);
...@@ -4244,7 +4334,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { ...@@ -4244,7 +4334,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
return loadPrimaryTS; return loadPrimaryTS;
} }
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_t orderType) { static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_t orderType) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
...@@ -4321,7 +4411,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ ...@@ -4321,7 +4411,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
* @param pQInfo * @param pQInfo
* @param result * @param result
*/ */
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) { void copyFromWindowResToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC; int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
...@@ -4360,7 +4450,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc ...@@ -4360,7 +4450,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current; STableQueryInfo* pTableQueryInfo = pQuery->current;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; SResultRowInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) { if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
...@@ -4434,16 +4524,19 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4434,16 +4524,19 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo); int32_t numOfTables = (int32_t) taosHashGetSize(pQInfo->arrTableIdInfo);
*(int32_t*)data = htonl(numOfTables); *(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t); data += sizeof(int32_t);
for(int32_t i = 0; i < numOfTables; i++) {
STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i); STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL);
while(item) {
STableIdInfo* pDst = (STableIdInfo*)data; STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(pSrc->uid); pDst->uid = htobe64(item->uid);
pDst->tid = htonl(pSrc->tid); pDst->tid = htonl(item->tid);
pDst->key = htobe64(pSrc->key); pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo); data += sizeof(STableIdInfo);
item = taosHashIterate(pQInfo->arrTableIdInfo, item);
} }
// Check if query is completed or not for stable query or normal table query respectively. // Check if query is completed or not for stable query or normal table query respectively.
...@@ -4605,7 +4698,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4605,7 +4698,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) { static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
assert(pQuery->limit.offset == 0); assert(pQuery->limit.offset == 0);
STimeWindow tw = *win; STimeWindow tw = *win;
...@@ -4655,7 +4748,23 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w ...@@ -4655,7 +4748,23 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w
static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// get the first unclosed time window
bool assign = false;
for(int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
if (pRuntimeEnv->windowResInfo.pResult[i]->closed) {
continue;
}
assign = true;
*start = pRuntimeEnv->windowResInfo.pResult[i]->win.skey;
}
if (!assign) {
*start = pQuery->current->lastKey; *start = pQuery->current->lastKey;
}
assert(*start <= pQuery->current->lastKey);
// if queried with value filter, do NOT forward query start position // if queried with value filter, do NOT forward query start position
if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->pFillInfo != NULL) { if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->pFillInfo != NULL) {
...@@ -4671,7 +4780,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { ...@@ -4671,7 +4780,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo *pTableQueryInfo = pQuery->current; STableQueryInfo *pTableQueryInfo = pQuery->current;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
...@@ -4770,13 +4879,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) ...@@ -4770,13 +4879,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
if (!isSTableQuery if (!isSTableQuery
&& (pQInfo->tableqinfoGroupInfo.numOfTables == 1) && (pQInfo->tableqinfoGroupInfo.numOfTables == 1)
...@@ -4893,20 +4996,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4893,20 +4996,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
int16_t type = TSDB_DATA_TYPE_NULL; int16_t type = TSDB_DATA_TYPE_NULL;
int32_t threshold = 0;
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags; if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
threshold = 4000;
} else { } else {
type = TSDB_DATA_TYPE_INT; // group id type = TSDB_DATA_TYPE_INT; // group id
threshold = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
if (threshold < 8) {
threshold = 8;
}
} }
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, threshold, type); code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4926,7 +5022,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4926,7 +5022,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_TIMESTAMP; type = TSDB_DATA_TYPE_TIMESTAMP;
} }
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, 1024, type); code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4984,6 +5080,20 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa ...@@ -4984,6 +5080,20 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa
} }
} }
static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTableQueryInfo) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
assert(
(pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) &&
(pTableQueryInfo->win.skey >= pQuery->window.skey && pTableQueryInfo->win.ekey <= pQuery->window.ekey));
} else {
assert(
(pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) &&
(pTableQueryInfo->win.skey <= pQuery->window.skey && pTableQueryInfo->win.ekey >= pQuery->window.ekey));
}
}
static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -5010,17 +5120,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -5010,17 +5120,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
} }
pQuery->current = *pTableQueryInfo; pQuery->current = *pTableQueryInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) { doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
assert(
((*pTableQueryInfo)->win.skey <= (*pTableQueryInfo)->win.ekey) &&
((*pTableQueryInfo)->lastKey >= (*pTableQueryInfo)->win.skey) &&
((*pTableQueryInfo)->win.skey >= pQuery->window.skey && (*pTableQueryInfo)->win.ekey <= pQuery->window.ekey));
} else {
assert(
((*pTableQueryInfo)->win.skey >= (*pTableQueryInfo)->win.ekey) &&
((*pTableQueryInfo)->lastKey <= (*pTableQueryInfo)->win.skey) &&
((*pTableQueryInfo)->win.skey <= pQuery->window.skey && (*pTableQueryInfo)->win.ekey >= pQuery->window.ekey));
}
if (!pRuntimeEnv->groupbyNormalCol) { if (!pRuntimeEnv->groupbyNormalCol) {
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
...@@ -5169,6 +5269,41 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -5169,6 +5269,41 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
return true; return true;
} }
STsdbQueryCond createTsdbQueryCond(SQuery* pQuery) {
STsdbQueryCond cond = {
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
return cond;
}
static STableIdInfo createTableIdInfo(SQuery* pQuery) {
assert(pQuery != NULL && pQuery->current != NULL);
STableIdInfo tidInfo;
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
return tidInfo;
}
static void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) {
STableIdInfo tidInfo = createTableIdInfo(pQuery);
STableIdInfo* idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
if (idinfo != NULL) {
assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid);
idinfo->key = tidInfo.key;
} else {
taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
}
}
/** /**
* super table query handler * super table query handler
* 1. super table projection query, group-by on normal columns query, ts-comp query * 1. super table projection query, group-by on normal columns query, ts-comp query
...@@ -5188,18 +5323,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5188,18 +5323,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
qDebug("QInfo:%p last_row query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, pQInfo->groupIndex,
numOfGroups, group);
STsdbQueryCond cond = {
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window); qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo,
pQInfo->groupIndex, numOfGroups, group);
STsdbQueryCond cond = createTsdbQueryCond(pQuery);
SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group); SArray *tx = taosArrayClone(group);
...@@ -5223,14 +5351,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5223,14 +5351,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1); assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb);
taosArrayDestroy(s); taosArrayDestroy(s);
// here we simply set the first table as current table // here we simply set the first table as current table
SArray* first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); SArray *first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
pQuery->current = taosArrayGetP(first, 0); pQuery->current = taosArrayGetP(first, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
...@@ -5254,17 +5382,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5254,17 +5382,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
} else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, numOfGroups);
STsdbQueryCond cond = { qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex,
.colList = pQuery->colList, numOfGroups);
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window); STsdbQueryCond cond = createTsdbQueryCond(pQuery);
SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group); SArray *tx = taosArrayClone(group);
...@@ -5287,7 +5410,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5287,7 +5410,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
longjmp(pRuntimeEnv->env, terrno); longjmp(pRuntimeEnv->env, terrno);
} }
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1); assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb);
...@@ -5296,7 +5419,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5296,7 +5419,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
scanMultiTableDataBlocks(pQInfo); scanMultiTableDataBlocks(pQInfo);
pQInfo->groupIndex += 1; pQInfo->groupIndex += 1;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
// no results generated for current group, continue to try the next group // no results generated for current group, continue to try the next group
taosArrayDestroy(s); taosArrayDestroy(s);
...@@ -5309,7 +5432,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5309,7 +5432,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); SResultRowCellInfo *pCell = getResultCell(pRuntimeEnv, pResult, j);
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes));
} }
} }
...@@ -5324,16 +5447,109 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5324,16 +5447,109 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size); ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size);
copyFromWindowResToSData(pQInfo, pWindowResInfo); copyFromWindowResToSData(pQInfo, pWindowResInfo);
pQInfo->groupIndex = currentGroupIndex; //restore the group index pQInfo->groupIndex = currentGroupIndex; // restore the group index
assert(pQuery->rec.rows == pWindowResInfo->size); assert(pQuery->rec.rows == pWindowResInfo->size);
clearClosedTimeWindow(pRuntimeEnv); clearClosedTimeWindow(pRuntimeEnv);
break; break;
} }
} else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL) {
//super table projection query with identical query time range for all tables.
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
resetDefaultResInfoOutputBuf(pRuntimeEnv);
SArray *group = GET_TABLEGROUP(pQInfo, 0);
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
void *pQueryHandle = pRuntimeEnv->pQueryHandle;
if (pQueryHandle == NULL) {
STsdbQueryCond con = createTsdbQueryCond(pQuery);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pQueryHandle = pRuntimeEnv->pQueryHandle;
}
// skip blocks without load the actual data block from file if no filter condition present
// skipBlocks(&pQInfo->runtimeEnv);
// if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
bool hasMoreBlock = true;
SQueryCostInfo *summary = &pRuntimeEnv->summary;
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo =
(STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if (pTableQueryInfo == NULL) {
break;
}
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
if (pRuntimeEnv->hasTagResults) {
setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb);
}
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo,
&pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
assert(status != BLK_DATA_DISCARD);
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
summary->totalRows += blockInfo.rows;
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes,
pQuery->current->lastKey);
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
// the flag may be set by tableApplyFunctionsOnBlock, clear it here
CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED);
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed
if (limitResults(pRuntimeEnv)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
break;
}
// while the output buffer is full or limit/offset is applied, query may be paused here
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL|QUERY_COMPLETED)) {
break;
}
}
if (!hasMoreBlock) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
}
} else { } else {
/* /*
* 1. super table projection query, 2. ts-comp query * the following two cases handled here.
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * 1. ts-comp query, and 2. the super table projection query with different query time range for each table.
* If the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place. * we need to return it to client in the first place.
*/ */
if (pQInfo->groupIndex > 0) { if (pQInfo->groupIndex > 0) {
...@@ -5396,14 +5612,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5396,14 +5612,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed. * to ensure that, we can reset the query range once query on a meter is completed.
*/ */
pQInfo->tableIndex++; pQInfo->tableIndex++;
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
STableIdInfo tidInfo = {0};
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
// if the buffer is full or group by each table, we need to jump out of the loop // if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
...@@ -5430,7 +5639,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5430,7 +5639,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
} }
}
/* /*
* 1. super table projection query, group-by on normal columns query, ts-comp query * 1. super table projection query, group-by on normal columns query, ts-comp query
...@@ -5451,10 +5659,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5451,10 +5659,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur;
} }
qDebug( qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64
"QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, " points returned, total:%" PRId64 ", offset:%" PRId64,
pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows,
pQuery->limit.offset); pQuery->rec.total, pQuery->limit.offset);
}
} }
static void doSaveContext(SQInfo *pQInfo) { static void doSaveContext(SQInfo *pQInfo) {
...@@ -5469,13 +5678,7 @@ static void doSaveContext(SQInfo *pQInfo) { ...@@ -5469,13 +5678,7 @@ static void doSaveContext(SQInfo *pQInfo) {
SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
} }
STsdbQueryCond cond = { STsdbQueryCond cond = createTsdbQueryCond(pQuery);
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
TIME_WINDOW_COPY(cond.twindow, pQuery->window);
// clean unused handle // clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle != NULL) {
...@@ -5748,13 +5951,8 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5748,13 +5951,8 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo; STableIdInfo tidInfo = createTableIdInfo(pQuery);
STableId* id = TSDB_TABLEID(pQuery->current->pTable); taosHashPut(pQInfo->arrTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
tidInfo.uid = id->uid;
tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
} }
if (!isTSCompQuery(pQuery)) { if (!isTSCompQuery(pQuery)) {
...@@ -6076,11 +6274,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -6076,11 +6274,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding); pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding);
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
// pQueryMsg->interval.intervalUnit = pQueryMsg->interval.intervalUnit;
// pQueryMsg->interval.slidingUnit = pQueryMsg->interval.slidingUnit;
// pQueryMsg->interval.offsetUnit = pQueryMsg->interval.offsetUnit;
pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->limit = htobe64(pQueryMsg->limit);
pQueryMsg->offset = htobe64(pQueryMsg->offset); pQueryMsg->offset = htobe64(pQueryMsg->offset);
pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit);
pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId); pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
...@@ -6752,8 +6948,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6752,8 +6948,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
} }
int tableIndex = 0;
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery); pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo)); pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
...@@ -6775,7 +6969,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6775,7 +6969,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
} }
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info // NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
pthread_mutex_init(&pQInfo->lock, NULL); pthread_mutex_init(&pQInfo->lock, NULL);
...@@ -6785,10 +6979,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6785,10 +6979,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
pQInfo->runtimeEnv.queryWindowIdentical = true;
STimeWindow window = pQuery->window; STimeWindow window = pQuery->window;
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
...@@ -6803,9 +6997,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6803,9 +6997,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j); STableKeyInfo* info = taosArrayGet(pa, j);
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
window.skey = info->lastKey; window.skey = info->lastKey;
if (info->lastKey != pQuery->window.skey) {
pQInfo->runtimeEnv.queryWindowIdentical = false;
}
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf); STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf);
if (item == NULL) { if (item == NULL) {
goto _cleanup; goto _cleanup;
...@@ -7019,7 +7216,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -7019,7 +7216,7 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->pBuf); tfree(pQInfo->pBuf);
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosArrayDestroy(pQInfo->arrTableIdInfo); taosHashCleanup(pQInfo->arrTableIdInfo);
pQInfo->signature = 0; pQInfo->signature = 0;
...@@ -7399,7 +7596,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -7399,7 +7596,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t); size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo); size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp)); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
......
...@@ -43,51 +43,48 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { ...@@ -43,51 +43,48 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
return size; return size;
} }
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, int32_t size, int32_t threshold, int16_t type) { int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pWindowResInfo->capacity = size; pResultRowInfo->capacity = size;
pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type; pResultRowInfo->type = type;
pWindowResInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
pWindowResInfo->size = 0; pResultRowInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES); pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
if (pWindowResInfo->pResult == NULL) { if (pResultRowInfo->pResult == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) {
if (pWindowResInfo == NULL) { if (pResultRowInfo == NULL) {
return; return;
} }
if (pWindowResInfo->capacity == 0) { if (pResultRowInfo->capacity == 0) {
assert(pWindowResInfo->pResult == NULL); assert(pResultRowInfo->pResult == NULL);
return; return;
} }
if (pWindowResInfo->type == TSDB_DATA_TYPE_BINARY || pWindowResInfo->type == TSDB_DATA_TYPE_NCHAR) { if (pResultRowInfo->type == TSDB_DATA_TYPE_BINARY || pResultRowInfo->type == TSDB_DATA_TYPE_NCHAR) {
for(int32_t i = 0; i < pWindowResInfo->size; ++i) { for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
tfree(pWindowResInfo->pResult[i]->key); tfree(pResultRowInfo->pResult[i]->key);
} }
} }
tfree(pWindowResInfo->pResult); tfree(pResultRowInfo->pResult);
} }
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) { void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) {
return; return;
} }
// assert(pWindowResInfo->size == 1); for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow *pWindowRes = pResultRowInfo->pResult[i];
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type);
SResultRow *pWindowRes = pWindowResInfo->pResult[i];
clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type);
int32_t groupIndex = 0; int32_t groupIndex = 0;
int64_t uid = 0; int64_t uid = 0;
...@@ -96,30 +93,30 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR ...@@ -96,30 +93,30 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex))); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
} }
pWindowResInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
pWindowResInfo->size = 0; pResultRowInfo->size = 0;
pWindowResInfo->startTime = TSKEY_INITIAL_VAL; pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
} }
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) {
return; return;
} }
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo);
assert(num >= 0 && num <= numOfClosed); assert(num >= 0 && num <= numOfClosed);
int16_t type = pWindowResInfo->type; int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv); int64_t uid = getResultInfoUId(pRuntimeEnv);
char *key = NULL; char *key = NULL;
int16_t bytes = -1; int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table if (pResult->closed) { // remove the window slot from hash table
getResultRowKeyInfo(pResult, type, &key, &bytes); getResultRowKeyInfo(pResult, type, &key, &bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
...@@ -129,23 +126,23 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -129,23 +126,23 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
} }
} }
int32_t remain = pWindowResInfo->size - num; int32_t remain = pResultRowInfo->size - num;
// clear all the closed windows from the window list // clear all the closed windows from the window list
for (int32_t k = 0; k < remain; ++k) { for (int32_t k = 0; k < remain; ++k) {
copyResultRow(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k], type); copyResultRow(pRuntimeEnv, pResultRowInfo->pResult[k], pResultRowInfo->pResult[num + k], type);
} }
// move the unclosed window in the front of the window list // move the unclosed window in the front of the window list
for (int32_t k = remain; k < pWindowResInfo->size; ++k) { for (int32_t k = remain; k < pResultRowInfo->size; ++k) {
SResultRow *pWindowRes = pWindowResInfo->pResult[k]; SResultRow *pWindowRes = pResultRowInfo->pResult[k];
clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type);
} }
pWindowResInfo->size = remain; pResultRowInfo->size = remain;
for (int32_t k = 0; k < pWindowResInfo->size; ++k) { for (int32_t k = 0; k < pResultRowInfo->size; ++k) {
SResultRow *pResult = pWindowResInfo->pResult[k]; SResultRow *pResult = pResultRowInfo->pResult[k];
getResultRowKeyInfo(pResult, type, &key, &bytes); getResultRowKeyInfo(pResult, type, &key, &bytes);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
...@@ -153,43 +150,43 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -153,43 +150,43 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
assert(p != NULL); assert(p != NULL);
int32_t v = (*p - num); int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size); assert(v >= 0 && v <= pResultRowInfo->size);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
} }
pWindowResInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
} }
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) {
return; return;
} }
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo);
clearFirstNWindowRes(pRuntimeEnv, numOfClosed); clearFirstNWindowRes(pRuntimeEnv, numOfClosed);
} }
int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) { int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) {
int32_t i = 0; int32_t i = 0;
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) { while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
++i; ++i;
} }
return i; return i;
} }
void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pWindowResInfo->pResult[i]->closed) { if (pResultRowInfo->pResult[i]->closed) {
continue; continue;
} }
pWindowResInfo->pResult[i]->closed = true; pResultRowInfo->pResult[i]->closed = true;
} }
} }
...@@ -198,41 +195,41 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -198,41 +195,41 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order * NOTE: remove redundant, only when the result set order equals to traverse order
*/ */
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) { void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
if (pWindowResInfo->size <= 1) { if (pResultRowInfo->size <= 1) {
return; return;
} }
// get the result order // get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0]->win.skey < pWindowResInfo->pResult[1]->win.skey)? 1:-1; int32_t resultOrder = (pResultRowInfo->pResult[0]->win.skey < pResultRowInfo->pResult[1]->win.skey)? 1:-1;
if (order != resultOrder) { if (order != resultOrder) {
return; return;
} }
int32_t i = 0; int32_t i = 0;
if (order == QUERY_ASC_FORWARD_STEP) { if (order == QUERY_ASC_FORWARD_STEP) {
TSKEY ekey = pWindowResInfo->pResult[i]->win.ekey; TSKEY ekey = pResultRowInfo->pResult[i]->win.ekey;
while (i < pWindowResInfo->size && (ekey < lastKey)) { while (i < pResultRowInfo->size && (ekey < lastKey)) {
++i; ++i;
} }
} else if (order == QUERY_DESC_FORWARD_STEP) { } else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i]->win.skey > lastKey)) { while (i < pResultRowInfo->size && (pResultRowInfo->pResult[i]->win.skey > lastKey)) {
++i; ++i;
} }
} }
if (i < pWindowResInfo->size) { if (i < pResultRowInfo->size) {
pWindowResInfo->size = (i + 1); pResultRowInfo->size = (i + 1);
} }
} }
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) {
return (getResultRow(pWindowResInfo, slot)->closed == true); return (getResultRow(pResultRowInfo, slot)->closed == true);
} }
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow(pWindowResInfo, slot)->closed = true; getResultRow(pResultRowInfo, slot)->closed = true;
} }
void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) { void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) {
......
...@@ -151,8 +151,9 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); ...@@ -151,8 +151,9 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
*/ */
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
if (capacity == 0 || fn == NULL) { assert(fn != NULL);
return NULL; if (capacity == 0) {
capacity = 4;
} }
SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj)); SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj));
......
...@@ -47,6 +47,7 @@ while $i < $tbNum ...@@ -47,6 +47,7 @@ while $i < $tbNum
$i = $i + 1 $i = $i + 1
endw endw
$ts = $ts + 60000 $ts = $ts + 60000
$tb = $tbPrefix . 0 $tb = $tbPrefix . 0
sql insert into $tb (ts) values ( $ts ) sql insert into $tb (ts) values ( $ts )
...@@ -84,4 +85,43 @@ sleep 500 ...@@ -84,4 +85,43 @@ sleep 500
run general/parser/first_last_query.sim run general/parser/first_last_query.sim
print =================> insert data regression test
sql create database test keep 36500
sql use test
sql create table tm0 (ts timestamp, k int)
print =========================> td-2298
$ts0 = 1537146000000
$xs = 6000
$x = 0
while $x < 5000
$ts = $ts0 + $xs
$ts1 = $ts + $xs
$x1 = $x + 1
sql insert into tm0 values ( $ts , $x ) ( $ts1 , $x1 )
$x = $x1
$ts0 = $ts1
endw
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 500
sql use test
sql select count(*), last(ts) from tm0 interval(1s)
if $rows != 10000 then
print expect 10000, actual: $rows
return -1
endi
sql select last(ts) from tm0 interval(1s)
if $rows != 10000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -267,3 +267,5 @@ if $data14 != @test2@ then ...@@ -267,3 +267,5 @@ if $data14 != @test2@ then
print expect test2 , actual: $data14 print expect test2 , actual: $data14
return -1 return -1
endi endi
sql drop table stest
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
$dbPrefix = m_func_db
$tbPrefix = m_func_tb
$mtPrefix = m_func_mt
$tbNum = 10
$rowNum = 5
$totalNum = $tbNum * $rowNum
$ts0 = 1537146000000
$delta = 600000
print ========== alter.sim
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exists $db
sql create database $db
sql use $db
print =====================================> test case for twa in single block
sql create table t1 (ts timestamp, k float);
sql insert into t1 values('2015-08-18 00:00:00', 2.064);
sql insert into t1 values('2015-08-18 00:06:00', 2.116);
sql insert into t1 values('2015-08-18 00:12:00', 2.028);
sql insert into t1 values('2015-08-18 00:18:00', 2.126);
sql insert into t1 values('2015-08-18 00:24:00', 2.041);
sql insert into t1 values('2015-08-18 00:30:00', 2.051);
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:05:00'
if $rows != 1 then
return -1
endi
if $data00 != 2.063999891 then
return -1
endi
if $data01 != 2.063999891 then
return -1
endi
if $data02 != 1 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:07:00'
if $rows != 1 then
return -1
endi
if $data00 != 2.089999914 then
return -1
endi
if $data01 != 2.089999914 then
return -1
endi
if $data02 != 2 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:07:00' interval(1m) order by ts asc
if $rows != 2 then
return -1
endi
if $data00 != @15-08-18 00:00:00.000@ then
return -1
endi
if $data01 != 2.068333156 then
return -1
endi
if $data02 != 2.063999891 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data10 != @15-08-18 00:06:00.000@ then
return -1
endi
if $data11 != 2.115999937 then
return -1
endi
if $data12 != 2.115999937 then
return -1
endi
if $data13 != 1 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:07:00' interval(1m) order by ts desc;
if $rows != 2 then
return -1
endi
if $data00 != @15-08-18 00:06:00.00@ then
return -1
endi
if $data01 != 2.115999937 then
return -1
endi
if $data02 != 2.115999937 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data11 != 2.068333156 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:27:00' interval(10m) order by ts asc
if $rows != 3 then
return -1
endi
if $data01 != 2.088666666 then
return -1
endi
if $data02 != 2.089999914 then
return -1
endi
if $data03 != 2 then
return -1
endi
if $data11 != 2.077099980 then
return -1
endi
if $data12 != 2.077000022 then
return -1
endi
if $data13 != 2 then
return -1
endi
if $data21 != 2.069333235 then
return -1
endi
if $data22 != 2.040999889 then
return -1
endi
if $data23 != 1 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:27:00' interval(10m) order by ts desc
if $rows != 3 then
return -1
endi
if $data01 != 2.069333235 then
return -1
endi
if $data11 != 2.077099980 then
return -1
endi
if $data21 != 2.088666666 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' order by ts asc
if $data00 != 2.073699975 then
return -1
endi
if $data01 != 2.070999980 then
return -1
endi
if $data02 != 6 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' order by ts desc
if $rows != 1 then
return -1
endi
if $data00 != 2.073699975 then
return -1
endi
if $data01 != 2.070999980 then
return -1
endi
if $data02 != 6 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts asc
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts desc
#todo add test case while column filte exists.
select count(*),TWA(k) from tm0 where ts>='1970-1-1 13:43:00' and ts<='1970-1-1 13:44:10' interval(9s)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册