From 847a0d7de3fdaab765b610a5e943c4a7be7bb2a7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 May 2020 10:21:40 +0800 Subject: [PATCH] [td-225] fix bug while client does not retrieve or paritally retrieve result from vnode. --- src/client/src/tscAsync.c | 1 + src/client/src/tscFunctionImpl.c | 91 +++++++- src/client/src/tscServer.c | 6 +- src/client/src/tscSql.c | 39 ++-- src/client/src/tscUtil.c | 3 +- src/inc/tsdb.h | 2 + src/query/inc/tsqlfunction.h | 3 - src/query/src/qExecutor.c | 69 +++--- src/query/src/tvariant.c | 4 +- src/tsdb/src/tsdbMeta.c | 3 +- src/tsdb/src/tsdbRead.c | 225 ++++++++++++++++---- tests/script/general/parser/interp_test.sim | 2 + tests/script/general/parser/testSuite.sim | 16 +- 13 files changed, 352 insertions(+), 112 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d7224871da..96837e4dd4 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -57,6 +57,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const } pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); + if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 81602d17f4..f56babca03 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3003,6 +3003,7 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { output += VARSTR_HEADER_SIZE; } + // todo : handle the binary/nchar data tVariantDump(&pCtx->tag, output, pCtx->tag.nType); pCtx->aOutputBuf += pCtx->outputBytes; } @@ -3857,10 +3858,14 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { * param[2]: next value of specified timestamp * param[3]: denotes if the result is a precious result or interpolation results * + * param[1]: denote the specified timestamp to generated the interp result + * param[2]: fill policy + * * @param pCtx */ static void interp_function(SQLFunctionCtx *pCtx) { // at this point, the value is existed, return directly +#if 0 if (pCtx->param[3].i64Key == 1) { char *pData = GET_INPUT_CHAR(pCtx); assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType); @@ -3955,6 +3960,88 @@ static void interp_function(SQLFunctionCtx *pCtx) { tVariantDestroy(&pCtx->param[2]); // data in the check operation are all null, not output + SET_VAL(pCtx, pCtx->size, 1); +#endif + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SInterpInfoDetail* pInfo = pResInfo->interResultBuf; + + if (pCtx->size == 1) { + char *pData = GET_INPUT_CHAR(pCtx); + assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType); + } else { + /* + * use interpolation to generate the result. + * Note: the result of primary timestamp column uses the timestamp specified by user in the query sql + */ + assert(pCtx->size == 2); + if (pInfo->type == TSDB_FILL_NONE) { // set no output result + return; + } + + if (pInfo->primaryCol == 1) { + *(TSKEY *) pCtx->aOutputBuf = pInfo->ts; + } else { + if (pInfo->type == TSDB_FILL_NULL) { + if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->outputType); + } else { + setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } + + SET_VAL(pCtx, pCtx->size, 1); + } else if (pInfo->type == TSDB_FILL_SET_VALUE) { + tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType); + } else if (pInfo->type == TSDB_FILL_PREV) { + char *data = GET_INPUT_CHAR_INDEX(pCtx, 0); + assignVal(pCtx->aOutputBuf, data, pCtx->outputBytes, pCtx->outputType); + + SET_VAL(pCtx, pCtx->size, 1); + } else if (pInfo->type == TSDB_FILL_LINEAR) { + char *data1 = GET_INPUT_CHAR_INDEX(pCtx, 0); + char *data2 = GET_INPUT_CHAR_INDEX(pCtx, 1); + + TSKEY key1 = pCtx->ptsList[0]; + TSKEY key2 = pCtx->ptsList[1]; + + SPoint point1 = {.key = key1, .val = data1}; + SPoint point2 = {.key = key2, .val = data2}; + + SPoint point = {.key = pInfo->ts, .val = pCtx->aOutputBuf}; + + int32_t srcType = pCtx->inputType; + if ((srcType >= TSDB_DATA_TYPE_TINYINT && srcType <= TSDB_DATA_TYPE_BIGINT) || + srcType == TSDB_DATA_TYPE_TIMESTAMP || srcType == TSDB_DATA_TYPE_DOUBLE) { + point1.val = data1; + point2.val = data2; + + if (isNull(data1, srcType) || isNull(data2, srcType)) { + setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); + } else { + taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point); + } + } else if (srcType == TSDB_DATA_TYPE_FLOAT) { + point1.val = data1; + point2.val = data2; + + if (isNull(data1, srcType) || isNull(data2, srcType)) { + setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); + } else { + taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point); + } + + } else { + if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pCtx->aOutputBuf, pCtx->inputType); + } else { + setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); + } + } + } + } + + } + SET_VAL(pCtx, pCtx->size, 1); } @@ -4910,7 +4997,7 @@ SQLAggFuncElem aAggs[] = {{ "interp", TSDB_FUNC_INTERP, TSDB_FUNC_INTERP, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS , function_setup, interp_function, do_sum_f, // todo filter handle @@ -4918,7 +5005,7 @@ SQLAggFuncElem aAggs[] = {{ doFinalizer, noop1, copy_function, - no_data_info, + data_req_load_info, }, { // 28 diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7d0ac09e66..38e7065a9a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -220,7 +220,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { if (pSql->freed || pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, pObj, pObj->signature); - tscFreeSqlObj(pSql); + if (pObj->pSql != pSql) { // it is not a main SqlObj, should be freed + tscFreeSqlObj(pSql); + } rpcFreeCont(rpcMsg->pCont); return; } @@ -1867,8 +1869,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - free(pTableMeta); tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name); + free(pTableMeta); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 14605a571d..66ab4aa7dd 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -530,7 +530,6 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { SSqlObj *pSql = (SSqlObj *)res; SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; tscTrace("%p start to free result", pSql); @@ -562,10 +561,6 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { return; } - pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - /* * case 1. Partial data have been retrieved from vnodes, but not all data has been retrieved yet. * We need to recycle the connection by noticing the vnode return 0 results. @@ -576,27 +571,27 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { * for each subquery. Because the failure of execution tsProcessSql may trigger the callback function * be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport */ - if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_FETCH) && - (pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || - (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT && - pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - - tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command); - - pSql->freed = 1; - tscProcessSql(pSql); +// if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE || +// pCmd->command == TSDB_SQL_FETCH) && +// (pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || +// (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT && +// pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) { +// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; +// +// tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command); +// +// pSql->freed = 1; +// tscProcessSql(pSql); /* * If release connection msg is sent to vnode, the corresponding SqlObj for async query can not be freed instantly, * since its free operation is delegated to callback function, which is tscProcessMsgFromServer. */ - STscObj* pObj = pSql->pTscObj; - if (pObj->pSql == pSql) { - pObj->pSql = NULL; - } - } else { // if no free resource msg is sent to vnode, we free this object immediately. +// STscObj* pObj = pSql->pTscObj; +// if (pObj->pSql == pSql) { +// pObj->pSql = NULL; +// } +// } else { // if no free resource msg is sent to vnode, we free this object immediately. STscObj* pTscObj = pSql->pTscObj; if (pTscObj->pSql != pSql) { @@ -611,7 +606,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { tscTrace("%p sql result is freed by app", pSql); } } - } +// } } void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d66203ecbe..97ce0fbe23 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -421,8 +421,9 @@ void tscFreeSqlObj(SSqlObj* pSql) { memset(pCmd->payload, 0, (size_t)pCmd->allocSize); tfree(pCmd->payload); - pCmd->allocSize = 0; + + tfree(pSql->sqlstr); free(pSql); } diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index bee68b81f5..da806061dd 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -196,6 +196,8 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable */ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); +TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList); + /** * move to next block if exists * diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 6e591b28d2..62b551f2f5 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -269,9 +269,6 @@ extern struct SQLAggFuncElem aAggs[]; /* compatible check array list */ extern int32_t funcCompatDefList[]; -void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max, - int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull); - bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *minval, char *maxval); bool stableQueryFunctChanged(int32_t funcId); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4b0ff36d36..44788b6b9b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -110,8 +110,9 @@ static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInf static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); -static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* pData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag); +static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, + SDataStatis *pStatis, void *param, int32_t colIndex); + static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); @@ -954,16 +955,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; - int32_t colId = pQuery->pSelectExpr[k].base.colInfo.colId; - - SDataStatis *tpField = NULL; - - bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &tpField); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - - setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, functionId, tpField, hasNull, - &sasArray[k], colId); + setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1176,16 +1169,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; - int32_t colId = pQuery->pSelectExpr[k].base.colInfo.colId; - - SDataStatis *pColStatis = NULL; - - bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &pColStatis); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - - setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, functionId, pColStatis, hasNull, - &sasArray[k], colId); + setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k); } // set the input column data @@ -1354,11 +1339,16 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl } void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t colId) { - pCtx->hasNull = hasNull; + SDataStatis *pStatis, void *param, int32_t colIndex) { + + int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId; + int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId; + + SDataStatis *tpField = NULL; + pCtx->hasNull = hasNullValue(pQuery, colIndex, pBlockInfo->numOfCols, pStatis, &tpField); pCtx->aInputElemBuf = inputData; - if (pStatis != NULL) { + if (tpField != NULL) { pCtx->preAggVals.isSet = true; pCtx->preAggVals.statis = *pStatis; if (pCtx->preAggVals.statis.numOfNull == -1) { @@ -1404,6 +1394,19 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY pCtx->preAggVals.statis.min = pBlockInfo->window.skey; pCtx->preAggVals.statis.max = pBlockInfo->window.ekey; } + } else if (functionId == TSDB_FUNC_INTERP) { + SInterpInfoDetail *pInterpInfo = GET_RES_INFO(pCtx)->interResultBuf; + pInterpInfo->type = pQuery->fillType; + pInterpInfo->ts = pQuery->window.skey; + pInterpInfo->primaryCol = (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX); + + if (pQuery->fillVal != NULL) { + if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) { + pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; + } else { // todo refactor, tVariantCreateFromBinary should handle the NULL value + tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[colIndex], pCtx->inputBytes, pCtx->inputType); + } + } } #if defined(_DEBUG_VIEW) @@ -1635,10 +1638,11 @@ static bool isFixedOutputQuery(SQuery *pQuery) { return false; } +// todo refactor with isLastRowQuery static bool isPointInterpoQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionID = pQuery->pSelectExpr[i].base.functionId; - if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { + if (functionID == TSDB_FUNC_INTERP/* || functionID == TSDB_FUNC_LAST_ROW*/) { return true; } } @@ -1817,7 +1821,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { /* * todo add more parameters to check soon.. */ -bool vnodeParametersSafetyCheck(SQuery *pQuery) { +bool colIdCheck(SQuery *pQuery) { // load data column information is incorrect for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) { if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) { @@ -1825,6 +1829,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) { return false; } } + return true; } @@ -1851,7 +1856,7 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } -static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { +static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) { // in case of point-interpolation query, use asc order scan char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64; @@ -1906,7 +1911,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { } } else { // interval query - if (metricQuery) { + if (stableQuery) { if (onlyFirstQuery(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) { qTrace(msg, GET_QINFO_ADDR(pQuery), "only-first stable", pQuery->order.order, TSDB_ORDER_ASC, @@ -3841,7 +3846,8 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; - if (pQuery->fillType == TSDB_FILL_NONE) { + // todo refactor + if (pQuery->fillType == TSDB_FILL_NONE || (pQuery->fillType != TSDB_FILL_NONE && isPointInterpoQuery(pQuery))) { assert(pFillInfo == NULL); return false; } @@ -4195,7 +4201,6 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { return true; } - static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -4229,6 +4234,8 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { if (isFirstLastRowQuery(pQuery)) { pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); + } else if (isPointInterpoQuery(pQuery)) { + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableIdGroupInfo); } else { pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); } @@ -4346,7 +4353,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool // pointInterpSupporterSetData(pQInfo, &interpInfo); // pointInterpSupporterDestroy(&interpInfo); - if (pQuery->fillType != TSDB_FILL_NONE) { + if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery); pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput, pQuery->slidingTime, pQuery->fillType, pColInfo); @@ -5809,7 +5816,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, goto _cleanup; } - vnodeParametersSafetyCheck(pQuery); + colIdCheck(pQuery); qTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; diff --git a/src/query/src/tvariant.c b/src/query/src/tvariant.c index c89e9dc5f2..f21f5d76d9 100644 --- a/src/query/src/tvariant.c +++ b/src/query/src/tvariant.c @@ -766,7 +766,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) { case TSDB_DATA_TYPE_BINARY: { if (pVariant->nType == TSDB_DATA_TYPE_NULL) { - *payload = TSDB_DATA_BINARY_NULL; + setVardataNull(payload,TSDB_DATA_TYPE_BINARY); } else { if (pVariant->nType != TSDB_DATA_TYPE_BINARY) { toBinary(pVariant, &payload, &pVariant->nLen); @@ -786,7 +786,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) { } case TSDB_DATA_TYPE_NCHAR: { if (pVariant->nType == TSDB_DATA_TYPE_NULL) { - *(uint32_t *) payload = TSDB_DATA_NCHAR_NULL; + setVardataNull(payload,TSDB_DATA_TYPE_NCHAR); } else { if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) { toNchar(pVariant, &payload, &pVariant->nLen); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 95680f95c4..c09009b468 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -101,6 +101,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) { pTable->schema = tdDecodeSchema(&ptr); } + pTable->lastKey = TSKEY_INITIAL_VAL; return pTable; } @@ -349,7 +350,7 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { table->name = calloc(1, size + VARSTR_HEADER_SIZE + 1); STR_WITH_SIZE_TO_VARSTR(table->name, pCfg->name, size); - table->lastKey = 0; + table->lastKey = TSKEY_INITIAL_VAL; if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE table->type = TSDB_CHILD_TABLE; table->superUid = pCfg->superUid; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 595217debb..32af840a28 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -117,6 +117,7 @@ typedef struct STsdbQueryHandle { } STsdbQueryHandle; static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); +static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -188,9 +189,6 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable } } - for(int32_t i = 0; i < numOfCols; ++i) { - } - uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); @@ -213,9 +211,9 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList); pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; - pQueryHandle->order = TSDB_ORDER_ASC; +// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded -// changeQueryHandleForLastrowQuery(pQueryHandle); + changeQueryHandleForInterpQuery(pQueryHandle); return pQueryHandle; } @@ -581,18 +579,33 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* } SArray* sa = getDefaultLoadColumns(pQueryHandle, true); + doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); taosArrayDestroy(sa); } else { - pQueryHandle->realNumOfRows = binfo.rows; - - cur->rows = binfo.rows; - cur->win = binfo.window; - cur->mixBlock = false; - cur->blockCompleted = true; - cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); + /* + * no data in cache, only load data from file + * during the query processing, data in cache will not be checked anymore. + * + * Here the buffer is not enough, so only part of file block can be loaded into memory buffer + */ +// if (pQueryHandle->outputCapacity < binfo.rows) { +// SArray* sa = getDefaultLoadColumns(pQueryHandle, true); +// doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); +// +// doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); +// taosArrayDestroy(sa); +// } else { + pQueryHandle->realNumOfRows = binfo.rows; + + cur->rows = binfo.rows; + cur->win = binfo.window; + cur->mixBlock = false; + cur->blockCompleted = true; + cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); +// } } } @@ -622,15 +635,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } } else { //desc order, query ended in current block - if (pQueryHandle->window.ekey > pBlock->keyFirst) { + if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { return false; } - - SDataCols* pDataCols = pCheckInfo->pDataCols; + + SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; if (pCheckInfo->lastKey < pBlock->keyLast) { - cur->pos = - binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); + cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { cur->pos = pBlock->numOfRows - 1; } @@ -1011,7 +1023,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { firstPos = 0; lastPos = num - 1; - if (order == 0) { + if (order == TSDB_ORDER_DESC) { // find the first position which is smaller than the key while (1) { if (key >= keyList[lastPos]) return lastPos; @@ -1307,12 +1319,116 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { } // handle data in cache situation -bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) { - STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; +bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables > 0); + if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) { + pQueryHandle->type = TSDB_QUERY_TYPE_ALL; + pQueryHandle->order = TSDB_ORDER_DESC; + + if (!tsdbNextDataBlock(pHandle)) { + return false; + } + + SArray* sa = getDefaultLoadColumns(pQueryHandle, true); + /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle); + /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa); + + if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { + // data already retrieve, discard other data rows and return + int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); + memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes); + } + + pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey}; + pQueryHandle->window = pQueryHandle->cur.win; + pQueryHandle->cur.rows = 1; + pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; + return true; + } else { + STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); + pSecQueryHandle->order = TSDB_ORDER_ASC; + pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; + pSecQueryHandle->pTsdb = pQueryHandle->pTsdb; + pSecQueryHandle->type = TSDB_QUERY_TYPE_ALL; + pSecQueryHandle->cur.fid = -1; + pSecQueryHandle->cur.win = TSWINDOW_INITIALIZER; + pSecQueryHandle->checkFiles = true; + pSecQueryHandle->activeIndex = 0; + pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; + + tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb); + + // allocate buffer in order to load data blocks from file + int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + + pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); + pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {{0}, 0}; + SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); + + colInfo.info = pCol->info; + colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); + taosArrayPush(pSecQueryHandle->pColumns, &colInfo); + pSecQueryHandle->statis[i].colId = colInfo.info.colId; + } + + size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo)); + STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); + assert(pMeta != NULL); + + for (int32_t j = 0; j < si; ++j) { + STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); + + STableCheckInfo info = { + .lastKey = pSecQueryHandle->window.skey, + .tableId = pCheckInfo->tableId, + .pTableObj = pCheckInfo->pTableObj, + }; + + taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); + } + + tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); + tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); + + bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); + assert(ret); + + /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle); + /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); + memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes); + + SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i); + assert(pCol->info.colId == pCol1->info.colId); + + memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes); + } + + SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); + + pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; + pQueryHandle->window = pQueryHandle->cur.win; + pQueryHandle->cur.rows = 2; + + tsdbCleanupQueryHandle(pSecQueryHandle); + } + + pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; + return true; + } + if (pQueryHandle->checkFiles) { if (getDataBlocksInFiles(pQueryHandle)) { return true; @@ -1322,7 +1438,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) { pQueryHandle->checkFiles = false; } - // TODO: opt by using lastKeyOnFile // TODO: opt by consider the scan order return doHasDataInBuffer(pQueryHandle); } @@ -1336,23 +1451,22 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { // todo consider the query time window, current last_row does not apply the query time window size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - TSKEY key = 0; + TSKEY key = TSKEY_INITIAL_VAL; int32_t index = -1; for(int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - if (pCheckInfo->pTableObj->lastKey > key) { //todo lastKey should not be 0 by default + if (pCheckInfo->pTableObj->lastKey > key) { key = pCheckInfo->pTableObj->lastKey; index = i; } } - // todo, there are no data in all the tables. opt performance if (index == -1) { return; } - // erase all other elements in array list, todo refactor + // erase all other elements in array list size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { if (i == index) { @@ -1371,9 +1485,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { } STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index); - taosArrayDestroy(pQueryHandle->pTableCheckInfo); - - pQueryHandle->pTableCheckInfo = taosArrayInit(1, sizeof(STableCheckInfo)); + taosArrayClear(pQueryHandle->pTableCheckInfo); info.lastKey = key; taosArrayPush(pQueryHandle->pTableCheckInfo, &info); @@ -1382,6 +1494,43 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { pQueryHandle->window = (STimeWindow) {key, key}; } +static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) { + // filter the queried time stamp in the first place + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; + pQueryHandle->order = TSDB_ORDER_DESC; + + assert(pQueryHandle->window.skey == pQueryHandle->window.ekey); + + // starts from the buffer in case of descending timestamp order check data blocks + // todo consider the query time window, current last_row does not apply the query time window + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + + int32_t i = 0; + while(i < numOfTables) { + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey && + pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) { + break; + } + + i++; + } + + // there are no data in all the tables + if (i == numOfTables) { + return; + } + + STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i); + taosArrayClear(pQueryHandle->pTableCheckInfo); + + info.lastKey = pQueryHandle->window.skey; + taosArrayPush(pQueryHandle->pTableCheckInfo, &info); + + // update the query time window according to the chosen last timestamp + pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL}; +} + static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, STsdbQueryHandle* pQueryHandle) { int numOfRows = 0; @@ -1469,14 +1618,15 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY // copy data from cache into data block SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; - + STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + SQueryFilePos* cur = &pHandle->cur; + + STable* pTable = pCheckInfo->pTableObj; int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1; // there are data in file if (pHandle->cur.fid >= 0) { - STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; - STable* pTable = pBlockInfo->pTableCheckInfo->pTableObj; - + SDataBlockInfo blockInfo = { .uid = pTable->tableId.uid, .tid = pTable->tableId.tid, @@ -1487,15 +1637,12 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { return blockInfo; } else { - STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); - SQueryFilePos* cur = &pHandle->cur; - - STable* pTable = pCheckInfo->pTableObj; - if (pTable->mem != NULL) { // create mem table iterator if it is not created yet + // TODO move to next function + if (pTable->mem != NULL && pHandle->type != TSDB_QUERY_TYPE_EXTERNAL) { // create mem table iterator if it is not created yet assert(pCheckInfo->iter != NULL); STimeWindow* win = &cur->win; - pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, + cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API // update the last key value @@ -1505,7 +1652,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { } if (!ASCENDING_TRAVERSE(pHandle->order)) { - SWAP(pHandle->cur.win.skey, pHandle->cur.win.ekey, TSKEY); + SWAP(cur->win.skey, cur->win.ekey, TSKEY); } SDataBlockInfo blockInfo = { diff --git a/tests/script/general/parser/interp_test.sim b/tests/script/general/parser/interp_test.sim index 3e6d5de85b..8bffae4af6 100644 --- a/tests/script/general/parser/interp_test.sim +++ b/tests/script/general/parser/interp_test.sim @@ -117,6 +117,7 @@ $tb = $tbPrefix . 0 return -1 endi if $data01 != NULL then + print expect NULL, actual $data01 return -1 endi if $data02 != NULL then @@ -213,6 +214,7 @@ $tb = $tbPrefix . 0 return -1 endi if $data03 != 0.00000 then + print expect 0.00000, actual:$data03 return -1 endi # if $data04 != NULL then diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index ac867c9f7f..a61cf84217 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -62,16 +62,19 @@ sleep 2000 run general/parser/tbnameIn.sim sleep 2000 run general/parser/projection_limit_offset.sim - sleep 2000 run general/parser/limit2.sim -sleep 2000 -run general/parser/slimit.sim - sleep 2000 run general/parser/fill.sim sleep 2000 run general/parser/fill_stb.sim +sleep 2000 +run general/parser/where.sim +sleep 2000 +run general/parser/slimit.sim +sleep 2000 +run general/parser/select_with_tags.sim + sleep 2000 run general/parser/tags_dynamically_specifiy.sim sleep 2000 @@ -86,8 +89,6 @@ run general/parser/stream_on_sys.sim sleep 2000 run general/parser/stream.sim -sleep 2000 -run general/parser/where.sim sleep 2000 #run general/parser/repeatAlter.sim sleep 2000 @@ -97,11 +98,8 @@ run general/parser/join.sim sleep 2000 run general/parser/join_multivnode.sim -sleep 2000 -run general/parser/select_with_tags.sim sleep 2000 run general/parser/groupby.sim - sleep 2000 run general/parser/binary_escapeCharacter.sim sleep 2000 -- GitLab