diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index dfdf97ea432f276de0f73a102f87bb6fc42e4e3e..910a7b41126536eb0e7838e8aaf196b64642e5f4 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -365,6 +365,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; terrno = *(int32_t*) pMsg->msg; + tfree(pMsg->msg); (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); } @@ -447,9 +448,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { assert(pCmd->command != TSDB_SQL_INSERT); - // in case of insert, redo parsing the sql string and build new submit data block for two reasons: - // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. - // 2. vnode may need the schema information along with submit block to update its local table schema. if (pCmd->command == TSDB_SQL_SELECT) { tscDebug("%p redo parse sql string and proceed", pSql); pCmd->parseFinished = false; @@ -463,8 +461,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } tscProcessSql(pSql); - - }else { // in all other cases, simple retry + } else { // in all other cases, simple retry tscProcessSql(pSql); } diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 66b9eed21174de02df9afc8ff0aa7e681bc36929..a422856fed8716e00a7905f73246eb8496170852 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3993,13 +3993,12 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); - assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); - if (pInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); return; } - + + assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); if (pInfo->win.ekey == pInfo->win.skey) { *(double *)pCtx->aOutputBuf = pInfo->lastValue; } else { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 973f21c92b50570a69a4cab2b096877006e3d4e4..d2eb16795f88356633dc96768379fe058426ef12 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2172,6 +2172,15 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { return true; } +static void doFreeInsertSupporter(SSqlObj* pSqlObj) { + assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0); + + for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { + SSqlObj* pSql = pSqlObj->pSubs[i]; + tfree(pSql->param); + } +} + static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; @@ -2203,10 +2212,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) if (pParentObj->res.code == TSDB_CODE_SUCCESS) { tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); - for(int32_t i = 0; i < numOfSub; ++i) { - SSqlObj* pSql = pParentObj->pSubs[i]; - tfree(pSql->param); - } + doFreeInsertSupporter(pParentObj); // todo remove this parameter in async callback function definition. // all data has been sent to vnode, call user function @@ -2214,6 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { if (!needRetryInsert(pParentObj, numOfSub)) { + doFreeInsertSupporter(pParentObj); tscQueueAsyncRes(pParentObj); return; } @@ -2244,16 +2251,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->cmd.parseFinished = false; pParentObj->subState.numOfRemain = numOfFailed; - pParentObj->subState.numOfSub = numOfFailed; tscResetSqlCmdObj(&pParentObj->cmd, false); + // in case of insert, redo parsing the sql string and build new submit data block for two reasons: + // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. + // 2. vnode may need the schema information along with submit block to update its local table schema. tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++); int32_t code = tsParseSql(pParentObj, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { pParentObj->res.code = code; + doFreeInsertSupporter(pParentObj); tscQueueAsyncRes(pParentObj); return; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 92de3fb84a30ffc194ee6880f22c969b91bc40eb..e83cafa0ac526b88546f7a29a144e3cdc86cbe50 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -703,65 +703,60 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se return forwardStep; } +static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) { + int32_t i = 0; + int64_t skey = TSKEY_INITIAL_VAL; + + int32_t numOfClosed = 0; + for (i = 0; i < pWindowResInfo->size; ++i) { + SResultRow *pResult = pWindowResInfo->pResult[i]; + if (pResult->closed) { + numOfClosed += 1; + continue; + } + + TSKEY ekey = pResult->win.ekey; + if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { + closeTimeWindow(pWindowResInfo, i); + } else { + skey = pResult->win.skey; + break; + } + } + + // all windows are closed, set the last one to be the skey + if (skey == TSKEY_INITIAL_VAL) { + assert(i == pWindowResInfo->size); + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + } else { + pWindowResInfo->curIndex = i; + pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; + } + + return numOfClosed; +} + /** * NOTE: the query status only set for the first scan of master scan. */ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->scanFlag != MASTER_SCAN) { - return pWindowResInfo->size; - } - - // for group by normal column query, close time window and return. - if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { - closeAllTimeWindow(pWindowResInfo); + if (pRuntimeEnv->scanFlag != MASTER_SCAN || pWindowResInfo->size == 0) { return pWindowResInfo->size; } // no qualified results exist, abort check int32_t numOfClosed = 0; - - if (pWindowResInfo->size == 0) { - return pWindowResInfo->size; - } + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); // query completed - if ((lastKey >= pQuery->current->win.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (lastKey <= pQuery->current->win.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) { closeAllTimeWindow(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); } else { // set the current index to be the last unclosed window - int32_t i = 0; - int64_t skey = TSKEY_INITIAL_VAL; - - for (i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pResult = pWindowResInfo->pResult[i]; - if (pResult->closed) { - numOfClosed += 1; - continue; - } - - TSKEY ekey = pResult->win.ekey; - if ((ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pResult->win.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeTimeWindow(pWindowResInfo, i); - } else { - skey = pResult->win.skey; - break; - } - } - - // all windows are closed, set the last one to be the skey - if (skey == TSKEY_INITIAL_VAL) { - assert(i == pWindowResInfo->size); - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - } else { - pWindowResInfo->curIndex = i; - } - - pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; + numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery); // the number of completed slots are larger than the threshold, return current generated results to client. if (numOfClosed > pQuery->rec.threshold) { @@ -1050,24 +1045,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in } } -//static double getTSWindowInterpoVal(SColumnInfoData* pColInfo, int16_t srcColIndex, int16_t rowIndex, TSKEY key, char** prevRow, TSKEY* tsCols, int32_t step) { -// TSKEY start = tsCols[rowIndex]; -// TSKEY prevTs = (rowIndex == 0)? *(TSKEY *) prevRow[0] : tsCols[rowIndex - step]; -// -// double v1 = 0, v2 = 0, v = 0; -// char *prevVal = (rowIndex == 0)? prevRow[srcColIndex] : ((char*)pColInfo->pData) + (rowIndex - step) * pColInfo->info.bytes; -// -// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal); -// GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + rowIndex * pColInfo->info.bytes); -// -// SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; -// SPoint point2 = (SPoint){.key = start, .val = &v2}; -// SPoint point = (SPoint){.key = key, .val = &v}; -// taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); -// -// return v; -//} - // window start key interpolation static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { SQuery* pQuery = pRuntimeEnv->pQuery; @@ -1238,6 +1215,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); } done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); @@ -1249,6 +1228,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); } } @@ -1289,6 +1270,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); } done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); @@ -1299,6 +1282,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); } } @@ -1802,9 +1787,12 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl // interval query with limit applied int32_t numOfRes = 0; - if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - } else { + } else if (pRuntimeEnv->groupbyNormalCol) { + closeAllTimeWindow(pWindowResInfo); + numOfRes = pWindowResInfo->size; + } else { // projection query numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); // update the number of output result @@ -4487,6 +4475,18 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc } else { blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } + + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); + + // TODO refactor + if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { + closeAllTimeWindow(pWindowResInfo); + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + } else { + updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); + } + } } bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { diff --git a/tests/script/general/parser/function.sim b/tests/script/general/parser/function.sim index 34e9844f711313d9f7c1c2380177bfc2c7aa4740..79c620f90d06f1131c45d1705dccd5e5b311585e 100644 --- a/tests/script/general/parser/function.sim +++ b/tests/script/general/parser/function.sim @@ -111,7 +111,7 @@ if $rows != 2 then return -1 endi -if $data00 != @15-08-18 00:06:00.00@ then +if $data00 != @15-08-18 00:06:00.000@ then return -1 endi @@ -219,10 +219,15 @@ if $data02 != 6 then return -1 endi +sql select twa(k) from t1 where ts>'2015-8-18 00:00:00' and ts<'2015-8-18 00:00:1' +if $rows != 0 then + return -1 +endi + sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts asc sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts desc -#todo add test case while column filte exists. +#todo add test case while column filter exists. -select count(*),TWA(k) from tm0 where ts>='1970-1-1 13:43:00' and ts<='1970-1-1 13:44:10' interval(9s) +#sql select count(*),TWA(k) from tm0 where ts>='1970-1-1 13:43:00' and ts<='1970-1-1 13:44:10' interval(9s) diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 8593400ce8c3d6ae5375993b0fcc1d7c36b86ca8..cea6d98679945b1d0e8d4dc82f3dd389b3d04a30 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -103,6 +103,8 @@ sleep 500 run general/parser/timestamp.sim sleep 500 run general/parser/sliding.sim +sleep 500 +run general/parser/function.sim #sleep 500 #run general/parser/repeatStream.sim