From e1a34a1bf7a815771614b694d910c99a32df3a6e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 May 2020 19:13:15 +0800 Subject: [PATCH] [td-225]fix bugs in fill for super table query --- src/client/inc/tsclient.h | 2 +- src/client/src/tscFunctionImpl.c | 39 ++++- src/client/src/tscParseInsert.c | 6 +- src/client/src/tscSQLParser.c | 20 ++- src/client/src/tscSecondaryMerge.c | 5 +- src/client/src/tscServer.c | 4 +- src/client/src/tscStream.c | 2 +- src/client/src/tscUtil.c | 10 +- src/common/src/ttypes.c | 14 +- src/inc/taosdef.h | 1 + src/inc/taosmsg.h | 2 +- src/query/inc/qExecutor.h | 2 +- src/query/inc/qfill.h | 8 +- src/query/src/qExecutor.c | 160 ++++++++++++------- src/query/src/qfill.c | 102 +++++++----- src/query/src/tvariant.c | 9 +- src/tsdb/src/tsdbRead.c | 11 +- tests/script/general/parser/limit2_query.sim | 1 + 18 files changed, 248 insertions(+), 150 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 13f5ebc86e..08536a505d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -210,7 +210,7 @@ typedef struct SQueryInfo { int16_t numOfTables; STableMetaInfo **pTableMetaInfo; struct STSBuf * tsBuf; - int64_t * defaultVal; // default value for interpolation + int64_t * fillVal; // default value for interpolation char * msg; // pointer to the pCmd->payload to keep error message temporarily int64_t clauseLimit; // limit for current sub clause diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 52d904d314..a265243ce9 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -390,7 +390,11 @@ static void function_finalizer(SQLFunctionCtx *pCtx) { if (pResInfo->hasResult != DATA_SET_FLAG) { tscTrace("no result generated, result is set to NULL"); - setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->outputType); + } else { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } } doFinalizer(pCtx); @@ -1864,12 +1868,22 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) { SResultInfo *pResInfo = GET_RES_INFO(pCtx); if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->outputType); + } else { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } + return; } } else { if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->outputType); + } else { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } + return; } } @@ -2885,7 +2899,12 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { SLeastsquareInfo *pInfo = pResInfo->interResultBuf; if (pInfo->num == 0) { - setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->outputType); + } else { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } + return; } @@ -3862,7 +3881,11 @@ static void interp_function(SQLFunctionCtx *pCtx) { *(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts; } else { if (pInfoDetail->type == TSDB_FILL_NULL) { - setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->outputType); + } else { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } } else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) { tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType); } else if (pInfoDetail->type == TSDB_FILL_PREV) { @@ -3914,7 +3937,11 @@ static void interp_function(SQLFunctionCtx *pCtx) { } } else { - setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); + if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->inputBytes); + } else { + setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); + } } } } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 3e1f0787c3..9202203fac 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -312,8 +312,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, case TSDB_DATA_TYPE_BINARY: // binary data cannot be null-terminated char string, otherwise the last char of the string is lost if (pToken->type == TK_NULL) { - varDataSetLen(payload, sizeof(int8_t)); - *(uint8_t*) varDataVal(payload) = TSDB_DATA_BINARY_NULL; + setVardataNull(payload, TSDB_DATA_TYPE_BINARY); } else { // too long values will return invalid sql, not be truncated automatically if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z); @@ -326,8 +325,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, case TSDB_DATA_TYPE_NCHAR: if (pToken->type == TK_NULL) { - varDataSetLen(payload, sizeof(int32_t)); - *(uint32_t*) varDataVal(payload) = TSDB_DATA_NCHAR_NULL; + setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); } else { // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' size_t output = 0; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d23d0e1860..35a4937fd7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4015,9 +4015,9 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); - if (pQueryInfo->defaultVal == NULL) { - pQueryInfo->defaultVal = calloc(size, sizeof(int64_t)); - if (pQueryInfo->defaultVal == NULL) { + if (pQueryInfo->fillVal == NULL) { + pQueryInfo->fillVal = calloc(size, sizeof(int64_t)); + if (pQueryInfo->fillVal == NULL) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } } @@ -4028,7 +4028,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { pQueryInfo->fillType = TSDB_FILL_NULL; for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) { TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes); + if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); + } else { + setNull((char*)&pQueryInfo->fillVal[i], pFields->type, pFields->bytes); + }; } } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->fillType = TSDB_FILL_PREV; @@ -4061,11 +4065,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { - setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes); + setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); continue; } - int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type); + int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->fillVal[i], pFields->type); if (ret != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg); } @@ -4079,9 +4083,9 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { - setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes); + setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); } else { - tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type); + tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type); } } } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 35658d1867..7617621e5f 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -145,7 +145,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { pFillCol[i].flag = pExpr->colInfo.flag; pFillCol[i].col.offset = offset; pFillCol[i].functionId = pExpr->functionId; - pFillCol[i].defaultVal.i = pQueryInfo->defaultVal[i]; + pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; offset += pExpr->resBytes; } @@ -946,8 +946,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo } while (1) { - int64_t newRows = -1; - taosGenerateDataBlock(pFillInfo, pResPages, &newRows, pLocalReducer->resColModel->capacity); + int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity); if (pQueryInfo->limit.offset < newRows) { newRows -= pQueryInfo->limit.offset; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0aac5daa2e..7d0ac09e66 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -781,8 +781,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (pQueryInfo->fillType != TSDB_FILL_NONE) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]); - pMsg += sizeof(pQueryInfo->defaultVal[0]); + *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]); + pMsg += sizeof(pQueryInfo->fillVal[0]); } } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index fd84a2b759..1b1aaa54c9 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -223,7 +223,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->defaultVal[i]), pField->bytes, pField->type); + assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type); row[i] = pSql->res.data + offset; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 296ffe3333..ec9908ae96 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -281,7 +281,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) { } pQueryInfo->fillType = TSDB_FILL_NONE; - tfree(pQueryInfo->defaultVal); + tfree(pQueryInfo->fillVal); } int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { @@ -1616,7 +1616,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); - tfree(pQueryInfo->defaultVal); + tfree(pQueryInfo->fillVal); } void tscClearSubqueryInfo(SSqlCmd* pCmd) { @@ -1768,7 +1768,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNewQueryInfo->order = pQueryInfo->order; pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->pTableMetaInfo = NULL; - pNewQueryInfo->defaultVal = NULL; + pNewQueryInfo->fillVal = NULL; pNewQueryInfo->numOfTables = 0; pNewQueryInfo->tsBuf = NULL; @@ -1780,8 +1780,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); if (pQueryInfo->fillType != TSDB_FILL_NONE) { - pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); - memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); + pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); + memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); } if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 654cb65ec3..f97a146a1e 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -381,6 +381,18 @@ bool isNull(const char *val, int32_t type) { }; } +void setVardataNull(char* val, int32_t type) { + if (type == TSDB_DATA_TYPE_BINARY) { + varDataSetLen(val, sizeof(int8_t)); + *(uint8_t*) varDataVal(val) = TSDB_DATA_BINARY_NULL; + } else if (type == TSDB_DATA_TYPE_NCHAR) { + varDataSetLen(val, sizeof(int32_t)); + *(uint32_t*) varDataVal(val) = TSDB_DATA_NCHAR_NULL; + } else { + assert(0); + } +} + void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { @@ -483,7 +495,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) { break; }; case TSDB_DATA_TYPE_NCHAR: { - wcsncpy((wchar_t*)val, (wchar_t*)src, len / TSDB_NCHAR_SIZE); + varDataCopy(val, src); break; }; default: { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index cbb83d1028..b87a6b3118 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -157,6 +157,7 @@ extern tDataTypeDescriptor tDataTypeDesc[11]; bool isValidDataType(int32_t type, int32_t length); bool isNull(const char *val, int32_t type); +void setVardataNull(char* val, int32_t type); void setNull(char *val, int32_t type, int32_t bytes); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f550b1660f..17b975c193 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -443,7 +443,7 @@ typedef struct { int16_t numOfOutput; // final output columns numbers int16_t tagNameRelType; // relation of tag criteria and tbname criteria int16_t fillType; // interpolate type - uint64_t defaultVal; // default value array list + uint64_t fillVal; // default value array list int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block int32_t tsNumOfBlocks; // ts comp block numbers diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 3f4618bf1c..0996b643d1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -138,7 +138,7 @@ typedef struct SQuery { SColumnInfo* colList; SColumnInfo* tagColList; int32_t numOfFilterCols; - int64_t* defaultVal; + int64_t* fillVal; uint32_t status; // query status SResultRec rec; int32_t pos; diff --git a/src/query/inc/qfill.h b/src/query/inc/qfill.h index d0ba8941b2..323ff7a812 100644 --- a/src/query/inc/qfill.h +++ b/src/query/inc/qfill.h @@ -28,7 +28,7 @@ typedef struct { STColumn col; // column info int16_t functionId; // sql function id int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN - union {int64_t i; double d;} defaultVal; + union {int64_t i; double d;} fillVal; } SFillColInfo; typedef struct SFillInfo { @@ -75,15 +75,13 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision); -int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows); +int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows); int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); -int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData); - int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); -void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity); +int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0bd6f2ab34..47ad633e34 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -376,11 +376,16 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; } -static bool limitResults(SQuery *pQuery) { +static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQuery* pQuery = pRuntimeEnv->pQuery; + if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total; - assert(pQuery->rec.rows > 0); - + + qTrace("QInfo:%p discard remain data due to result limitation, limit:%"PRId64", current return:%d, total:%"PRId64, + pQInfo, pQuery->limit.limit, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + assert(pQuery->rec.rows >= 0); setQueryStatus(pQuery, QUERY_COMPLETED); return true; } @@ -624,15 +629,17 @@ static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t sea /** * NOTE: the query status only set for the first scan of master scan. */ -static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) { +static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) { - return; + return pWindowResInfo->size; } // no qualified results exist, abort check + int32_t numOfClosed = 0; + if (pWindowResInfo->size == 0) { - return; + return pWindowResInfo->size; } // query completed @@ -646,10 +653,10 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, int32_t i = 0; int64_t skey = TSKEY_INITIAL_VAL; - // TODO opt performance: get the closed time window here for (i = 0; i < pWindowResInfo->size; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->status.closed) { + numOfClosed += 1; continue; } @@ -672,16 +679,26 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey; - // the number of completed slots are larger than the threshold, dump to client immediately. - int32_t n = numOfClosedTimeWindow(pWindowResInfo); - if (n > pWindowResInfo->threshold) { + // the number of completed slots are larger than the threshold, return current generated results to client. + if (numOfClosed > pWindowResInfo->threshold) { + qTrace("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return", + GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold); + setQueryStatus(pQuery, QUERY_RESBUF_FULL); + } else { + qTrace("QInfo:%p total result window:%d already closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, + numOfClosed); } - - qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, n); } - + + // output has reached the limitation, set query completed + if (pQuery->limit.limit > 0 && (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosed && + pRuntimeEnv->scanFlag == MASTER_SCAN) { + setQueryStatus(pQuery, QUERY_COMPLETED); + } + assert(pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL); + return numOfClosed; } static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, @@ -1309,28 +1326,27 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); // interval query with limit applied - if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 && - (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosedTimeWindow(pWindowResInfo) && - pRuntimeEnv->scanFlag == MASTER_SCAN) { - setQueryStatus(pQuery, QUERY_COMPLETED); - } + int32_t numOfRes = 0; + + if (isIntervalQuery(pQuery)) { + numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); + } else { + numOfRes = getNumOfResult(pRuntimeEnv); - int32_t numOfRes = getNumOfResult(pRuntimeEnv); + // update the number of output result + if (numOfRes > 0 && pQuery->checkBuffer == 1) { + assert(numOfRes >= pQuery->rec.rows); + pQuery->rec.rows = numOfRes; - // update the number of output result - if (numOfRes > 0 && pQuery->checkBuffer == 1) { - assert(numOfRes >= pQuery->rec.rows); - pQuery->rec.rows = numOfRes; + if (numOfRes >= pQuery->rec.threshold) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); + } - if (numOfRes >= pQuery->rec.threshold) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } - - if ((pQuery->limit.limit >= 0) && numOfRes >= (pQuery->limit.limit + pQuery->limit.offset)) { - setQueryStatus(pQuery, QUERY_COMPLETED); + if ((pQuery->limit.limit >= 0) && (pQuery->limit.limit + pQuery->limit.offset) <= numOfRes) { + setQueryStatus(pQuery, QUERY_COMPLETED); + } } } @@ -2026,10 +2042,10 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI tVariantCreateFromBinary(&pCtx->param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT); - if (isNull((char *)&pQuery->defaultVal[i], pCtx->inputType)) { + if (isNull((char *)&pQuery->fillVal[i], pCtx->inputType)) { pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; } else { - tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->defaultVal[i], pCtx->inputBytes, pCtx->inputType); + tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType); } pInterpDetail->ts = pQuery->window.skey; @@ -2471,8 +2487,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), - blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), + blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); // while the output buffer is full or limit/offset is applied, query may be paused here if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) { @@ -3374,7 +3390,9 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus // during reverse scan pTableQueryInfo->lastKey = pStatus->lastKey; pQuery->status = pStatus->status; + pTableQueryInfo->win = pStatus->w; + pQuery->window = pTableQueryInfo->win; } void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { @@ -3396,6 +3414,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { if (pRuntimeEnv->scanFlag == MASTER_SCAN) { qstatus.status = pQuery->status; qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step; + qstatus.lastKey = pTableQueryInfo->lastKey; } if (!needScanDataBlocksAgain(pRuntimeEnv)) { @@ -3423,6 +3442,9 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; + + qTrace("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo, + cond.twindow.skey, cond.twindow.ekey); // check if query is killed or not if (isQueryKilled(pQInfo)) { @@ -3707,7 +3729,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde int32_t startIdx = 0; int32_t step = -1; - qTrace("QInfo:%p start to copy data from windowResInfo to query buf", GET_QINFO_ADDR(pQuery)); + qTrace("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); int32_t totalSubset = getNumOfSubset(pQInfo); if (orderType == TSDB_ORDER_ASC) { @@ -3836,7 +3858,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { } /* - * There are no results returned to client now. + * While the code reaches here, there are no results returned to client now. * If query is not completed yet, the gaps between two results blocks need to be handled after next data block * is retrieved from TSDB. * @@ -3890,18 +3912,24 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; + SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; + while (1) { - taosGenerateDataBlock(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata, &pQuery->rec.rows, pQuery->rec.capacity); - int32_t ret = pQuery->rec.rows; + int32_t ret = taosGenerateDataBlock(pFillInfo, (tFilePage**) pQuery->sdata, pQuery->rec.capacity); // todo apply limit output function /* reached the start position of according to offset value, return immediately */ if (pQuery->limit.offset == 0) { + qTrace("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pQInfo, pFillInfo->numOfRows, ret); return ret; } if (pQuery->limit.offset < ret) { + qTrace("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%d. Discard due to offset, remain:%d, new offset:%d", + pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0); + ret -= pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. // todo refactor move to the beginning of buffer @@ -3909,10 +3937,16 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].bytes * pQuery->limit.offset, ret * pQuery->pSelectExpr[i].bytes); } + pQuery->limit.offset = 0; return ret; } else { + qTrace("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%d. Discard due to offset, " + "remain:%d, new offset:%d", pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0, + pQuery->limit.offset - ret); + pQuery->limit.offset -= ret; + pQuery->rec.rows = 0; ret = 0; } @@ -3920,8 +3954,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int return ret; } } - - return 0; } void vnodePrintQueryStatistics(SQInfo *pQInfo) { @@ -4002,8 +4034,8 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes); + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); } void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { @@ -4120,8 +4152,9 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", - GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, + GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); + return true; } else { // do nothing *start = tw.skey; @@ -4215,7 +4248,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { pFillCol[i].col.offset = offset; pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query pFillCol[i].functionId = pExprInfo->base.functionId; - pFillCol[i].defaultVal.i = pQuery->defaultVal[i]; + pFillCol[i].fillVal.i = pQuery->fillVal[i]; offset += pExprInfo->bytes; } @@ -4591,7 +4624,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { skipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed - if (limitResults(pQuery)) { + if (limitResults(pRuntimeEnv)) { pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; break; } @@ -4846,7 +4879,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->rec.rows = getNumOfResult(pRuntimeEnv); skipResults(pRuntimeEnv); - limitResults(pQuery); + limitResults(pRuntimeEnv); } static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { @@ -4894,7 +4927,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) resetCtxOutputBuf(pRuntimeEnv); } - limitResults(pQuery); + limitResults(pRuntimeEnv); if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->current->lastKey, pQuery->window.ekey); @@ -4972,7 +5005,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { // the offset is handled at prepare stage if no interpolation involved if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) { - limitResults(pQuery); + limitResults(pRuntimeEnv); break; } else { TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, @@ -4980,11 +5013,10 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey); taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata); numOfInterpo = 0; + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo); - - qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - limitResults(pQuery); + limitResults(pRuntimeEnv); break; } @@ -5017,9 +5049,8 @@ static void tableQueryImpl(SQInfo *pQInfo) { int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo); pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo); - qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0) { - limitResults(pQuery); + limitResults(pRuntimeEnv); } qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -5351,7 +5382,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->fillType = htons(pQueryMsg->fillType); if (pQueryMsg->fillType != TSDB_FILL_NONE) { - pQueryMsg->defaultVal = (uint64_t)(pMsg); + pQueryMsg->fillVal = (uint64_t)(pMsg); int64_t *v = (int64_t *)pMsg; for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { @@ -5722,13 +5753,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, } if (pQuery->fillType != TSDB_FILL_NONE) { - pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); - if (pQuery->defaultVal == NULL) { + pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); + if (pQuery->fillVal == NULL) { goto _cleanup; } // the first column is the timestamp - memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutput * sizeof(int64_t)); + memcpy(pQuery->fillVal, (char *)pQueryMsg->fillVal, pQuery->numOfOutput * sizeof(int64_t)); } // to make sure third party won't overwrite this structure @@ -5785,7 +5816,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, return pQInfo; _cleanup: - tfree(pQuery->defaultVal); + tfree(pQuery->fillVal); if (pQuery->sdata != NULL) { for (int16_t col = 0; col < pQuery->numOfOutput; ++col) { @@ -5893,8 +5924,8 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pSelectExpr); } - if (pQuery->defaultVal != NULL) { - tfree(pQuery->defaultVal); + if (pQuery->fillVal != NULL) { + tfree(pQuery->fillVal); } // todo refactor, extract method to destroytableDataInfo @@ -5997,8 +6028,13 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { } pQuery->rec.total += pQuery->rec.rows; - qTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); + qTrace("QInfo:%p current numOfRes rows:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); + if (pQuery->limit.limit > 0 && pQuery->limit.limit == pQuery->rec.total) { + qTrace("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit); + setQueryStatus(pQuery, QUERY_OVER); + } + return TSDB_CODE_SUCCESS; // todo if interpolation exists, the result may be dump to client by several rounds diff --git a/src/query/src/qfill.c b/src/query/src/qfill.c index 4f69c44940..36ffc433ce 100644 --- a/src/query/src/qfill.c +++ b/src/query/src/qfill.c @@ -34,7 +34,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch * here we revised the start time of day according to the local time zone, * but in case of DST, the start time of one day need to be dynamically decided. * - * TODO dynamically decide the start time of a day + * TODO dynamically decide the start time of a day, move to common module */ // todo refactor to extract function that is available for Linux/Windows/Mac platform @@ -116,10 +116,9 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) return; } - pFillInfo->rowIdx = 0; + pFillInfo->rowIdx = 0; + pFillInfo->endKey = endKey; pFillInfo->numOfRows = numOfRows; - - pFillInfo->endKey = endKey; } void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) { @@ -131,6 +130,8 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) { assert(pFillInfo->numOfRows == pInput->num); + int32_t t = 0; + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; @@ -138,7 +139,7 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes); if (pCol->flag == TSDB_COL_TAG) { // copy the tag value - memcpy(pFillInfo->pTags[i], pFillInfo->pData[i], pCol->col.bytes); + memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes); } } } @@ -170,7 +171,7 @@ static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsA } } -int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) { +int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) { int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows, pFillInfo->slidingTime, ekey); return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; @@ -193,7 +194,7 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2 int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { switch (type) { case TSDB_DATA_TYPE_INT: { - *(int32_t*)point->val = linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, + *(int32_t*)point->val = (int32_t) linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, point2->key, point->key); break; } @@ -209,17 +210,17 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi }; case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: { - *(int64_t*)point->val = linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, + *(int64_t*)point->val = (int64_t) linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, point2->key, point->key); break; }; case TSDB_DATA_TYPE_SMALLINT: { - *(int16_t*)point->val = linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, + *(int16_t*)point->val = (int16_t) linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, point2->key, point->key); break; }; case TSDB_DATA_TYPE_TINYINT: { - *(int8_t*)point->val = + *(int8_t*) point->val = (int8_t) linearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key); break; }; @@ -243,8 +244,8 @@ static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, in static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts, char** pTags, bool outOfBound) { - char** prevValues = &pFillInfo->prevValues; - char** nextValues = &pFillInfo->nextValues; + char* prevValues = pFillInfo->prevValues; + char* nextValues = pFillInfo->nextValues; SPoint point1, point2, point; @@ -257,16 +258,21 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* // set the other values if (pFillInfo->fillType == TSDB_FILL_PREV) { - char* pInterpolationData = FILL_IS_ASC_FILL(pFillInfo) ? *prevValues : *nextValues; - if (pInterpolationData != NULL) { + char* p = FILL_IS_ASC_FILL(pFillInfo) ? prevValues : nextValues; + + if (p != NULL) { for (int32_t i = 1; i < numOfValCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); - if (isNull(pInterpolationData + pCol->col.offset, pCol->col.type)) { - setNull(val1, pCol->col.type, pCol->col.bytes); + if (isNull(p + pCol->col.offset, pCol->col.type)) { + if (pCol->col.type == TSDB_DATA_TYPE_BINARY || pCol->col.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(val1, pCol->col.type); + } else { + setNull(val1, pCol->col.type, pCol->col.bytes); + } } else { - assignVal(val1, pInterpolationData + pCol->col.offset, pCol->col.bytes, pCol->col.type); + assignVal(val1, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); } } } else { // no prev value yet, set the value for NULL @@ -274,14 +280,18 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* SFillColInfo* pCol = &pFillInfo->pFillCol[i]; char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); - setNull(val1, pCol->col.type, pCol->col.bytes); + if (pCol->col.type == TSDB_DATA_TYPE_BINARY||pCol->col.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(val1, pCol->col.type); + } else { + setNull(val1, pCol->col.type, pCol->col.bytes); + } } } setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) { // TODO : linear interpolation supports NULL value - if (*prevValues != NULL && !outOfBound) { + if (prevValues != NULL && !outOfBound) { for (int32_t i = 1; i < numOfValCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; @@ -289,14 +299,17 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* int16_t bytes = pCol->col.bytes; char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { + if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(val1, pCol->col.type); + continue; + } else if (type == TSDB_DATA_TYPE_BOOL) { setNull(val1, pCol->col.type, bytes); continue; } - point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + pCol->col.offset}; + point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes}; - point = (SPoint){.key = pFillInfo->start, .val = val1}; + point = (SPoint){.key = pFillInfo->start, .val = val1}; taosDoLinearInterpolation(type, &point1, &point2, &point); } @@ -307,7 +320,12 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* SFillColInfo* pCol = &pFillInfo->pFillCol[i]; char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); - setNull(val1, pCol->col.type, pCol->col.bytes); + + if (pCol->col.type == TSDB_DATA_TYPE_BINARY || pCol->col.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(val1, pCol->col.type); + } else { + setNull(val1, pCol->col.type, pCol->col.bytes); + } } setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); @@ -318,7 +336,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* SFillColInfo* pCol = &pFillInfo->pFillCol[i]; char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num); - assignVal(val1, (char*)&pCol->defaultVal.i, pCol->col.bytes, pCol->col.type); + assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); } setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); @@ -338,11 +356,16 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** nextValues) { *nextValues = calloc(1, pFillInfo->rowSize); for (int i = 1; i < pFillInfo->numOfCols; i++) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes); + + if (pCol->col.type == TSDB_DATA_TYPE_BINARY||pCol->col.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(*nextValues + pCol->col.offset, pCol->col.type); + } else { + setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes); + } } } -int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) { +int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) { int32_t num = 0; pFillInfo->numOfCurrent = 0; @@ -356,8 +379,8 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO if (numOfRows == 0) { /* - * we need to rebuild whole result set - * NOTE:we need to keep the last saved data, to generated the filled data + * These data are generated according to fill strategy, since the current timestamp is out of time window of + * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. */ while (num < outputRows) { doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); @@ -387,7 +410,7 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { - doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, false); + doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false); } /* output buffer is full, abort */ @@ -420,7 +443,7 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO assignVal(val1, src, pCol->col.bytes, pCol->col.type); memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes); } else { - assignVal(val1, (char*) &pCol->defaultVal.i, pCol->col.bytes, pCol->col.type); + assignVal(val1, (char*) &pCol->fillVal.i, pCol->col.bytes, pCol->col.type); } } } @@ -450,21 +473,12 @@ int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numO } } -void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) { +int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? - -// TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, -// pQuery->slidingTimeUnit, pQuery->precision); -// if (QUERY_IS_ASC_QUERY(pQuery)) { -// assert(ekey >= pQuery->window.ekey); -// } else { -// assert(ekey <= pQuery->window.ekey); -// } - - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); + int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); - int32_t numOfRes = taosDoInterpoResult(pFillInfo, output, remain, rows, pFillInfo->pData); - *outputRows = rows; - + int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData); assert(numOfRes == rows); + + return numOfRes; } diff --git a/src/query/src/tvariant.c b/src/query/src/tvariant.c index 6f8d579936..c89e9dc5f2 100644 --- a/src/query/src/tvariant.c +++ b/src/query/src/tvariant.c @@ -101,11 +101,12 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32 break; } case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length - pVar->nLen = len / TSDB_NCHAR_SIZE; - pVar->wpz = calloc(1, (pVar->nLen + 1) * TSDB_NCHAR_SIZE); + int32_t lenInwchar = len / TSDB_NCHAR_SIZE; + pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE); - wcsncpy(pVar->wpz, (wchar_t *)pz, pVar->nLen); - pVar->wpz[pVar->nLen] = 0; + wcsncpy(pVar->wpz, (wchar_t *)pz, lenInwchar); + pVar->wpz[lenInwchar] = 0; + pVar->nLen = len; break; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 0e70ab2d7e..595217debb 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1539,14 +1539,21 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); - memset(pHandle->statis, 0, sizeof(SDataStatis) * numOfCols); + for(int32_t i = 0; i < numOfCols; ++i) { + SDataStatis* st = &pHandle->statis[i]; + int32_t colId = st->colId; + + memset(st, 0, sizeof(SDataStatis)); + st->colId = colId; + } + tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); *pBlockStatis = pHandle->statis; //update the number of NULL data rows for(int32_t i = 0; i < numOfCols; ++i) { - if (pHandle->statis[i].numOfNull == -1) { + if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } } diff --git a/tests/script/general/parser/limit2_query.sim b/tests/script/general/parser/limit2_query.sim index 57cd13abee..cbff2e946c 100644 --- a/tests/script/general/parser/limit2_query.sim +++ b/tests/script/general/parser/limit2_query.sim @@ -36,6 +36,7 @@ if $data01 != 2 then return -1 endi if $data02 != tb2 then + print expect tb2, actual: $data02 return -1 endi if $data03 != tb2 then -- GitLab