未验证 提交 f15bc71b 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4522 from taosdata/feature/query

Feature/query
...@@ -365,6 +365,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -365,6 +365,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
static void tscProcessAsyncError(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle; void (*fp)() = pMsg->ahandle;
terrno = *(int32_t*) pMsg->msg; terrno = *(int32_t*) pMsg->msg;
tfree(pMsg->msg);
(*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
} }
...@@ -447,9 +448,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -447,9 +448,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(pCmd->command != TSDB_SQL_INSERT); assert(pCmd->command != TSDB_SQL_INSERT);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_SELECT) { if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql); tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false; pCmd->parseFinished = false;
...@@ -463,8 +461,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -463,8 +461,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // in all other cases, simple retry
}else { // in all other cases, simple retry
tscProcessSql(pSql); tscProcessSql(pSql);
} }
......
...@@ -3993,13 +3993,12 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3993,13 +3993,12 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo);
assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult);
if (pInfo->hasResult != DATA_SET_FLAG) { if (pInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return; return;
} }
assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult);
if (pInfo->win.ekey == pInfo->win.skey) { if (pInfo->win.ekey == pInfo->win.skey) {
*(double *)pCtx->aOutputBuf = pInfo->lastValue; *(double *)pCtx->aOutputBuf = pInfo->lastValue;
} else { } else {
......
...@@ -2172,6 +2172,15 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { ...@@ -2172,6 +2172,15 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
return true; return true;
} }
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);
for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pSqlObj->pSubs[i];
tfree(pSql->param);
}
}
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param; SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql; SSqlObj* pParentObj = pSupporter->pSql;
...@@ -2203,10 +2212,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2203,10 +2212,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
if (pParentObj->res.code == TSDB_CODE_SUCCESS) { if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
for(int32_t i = 0; i < numOfSub; ++i) { doFreeInsertSupporter(pParentObj);
SSqlObj* pSql = pParentObj->pSubs[i];
tfree(pSql->param);
}
// todo remove this parameter in async callback function definition. // todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function // all data has been sent to vnode, call user function
...@@ -2214,6 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2214,6 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
(*pParentObj->fp)(pParentObj->param, pParentObj, v); (*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else { } else {
if (!needRetryInsert(pParentObj, numOfSub)) { if (!needRetryInsert(pParentObj, numOfSub)) {
doFreeInsertSupporter(pParentObj);
tscQueueAsyncRes(pParentObj); tscQueueAsyncRes(pParentObj);
return; return;
} }
...@@ -2244,16 +2251,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2244,16 +2251,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->cmd.parseFinished = false; pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed; pParentObj->subState.numOfRemain = numOfFailed;
pParentObj->subState.numOfSub = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd, false); tscResetSqlCmdObj(&pParentObj->cmd, false);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema.
tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++); tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++);
int32_t code = tsParseSql(pParentObj, true); int32_t code = tsParseSql(pParentObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pParentObj->res.code = code; pParentObj->res.code = code;
doFreeInsertSupporter(pParentObj);
tscQueueAsyncRes(pParentObj); tscQueueAsyncRes(pParentObj);
return; return;
} }
......
...@@ -703,39 +703,11 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se ...@@ -703,39 +703,11 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep; return forwardStep;
} }
/** static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) {
* NOTE: the query status only set for the first scan of master scan.
*/
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN) {
return pWindowResInfo->size;
}
// for group by normal column query, close time window and return.
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
closeAllTimeWindow(pWindowResInfo);
return pWindowResInfo->size;
}
// no qualified results exist, abort check
int32_t numOfClosed = 0;
if (pWindowResInfo->size == 0) {
return pWindowResInfo->size;
}
// query completed
if ((lastKey >= pQuery->current->win.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->current->win.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window
int32_t i = 0; int32_t i = 0;
int64_t skey = TSKEY_INITIAL_VAL; int64_t skey = TSKEY_INITIAL_VAL;
int32_t numOfClosed = 0;
for (i = 0; i < pWindowResInfo->size; ++i) { for (i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) { if (pResult->closed) {
...@@ -744,8 +716,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe ...@@ -744,8 +716,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
} }
TSKEY ekey = pResult->win.ekey; TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
(pResult->win.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i); closeTimeWindow(pWindowResInfo, i);
} else { } else {
skey = pResult->win.skey; skey = pResult->win.skey;
...@@ -759,9 +730,33 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe ...@@ -759,9 +730,33 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
pWindowResInfo->curIndex = pWindowResInfo->size - 1; pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else { } else {
pWindowResInfo->curIndex = i; pWindowResInfo->curIndex = i;
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
} }
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; return numOfClosed;
}
/**
* NOTE: the query status only set for the first scan of master scan.
*/
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN || pWindowResInfo->size == 0) {
return pWindowResInfo->size;
}
// no qualified results exist, abort check
int32_t numOfClosed = 0;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// query completed
if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) {
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window
numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery);
// the number of completed slots are larger than the threshold, return current generated results to client. // the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pQuery->rec.threshold) { if (numOfClosed > pQuery->rec.threshold) {
...@@ -1050,24 +1045,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in ...@@ -1050,24 +1045,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
} }
} }
//static double getTSWindowInterpoVal(SColumnInfoData* pColInfo, int16_t srcColIndex, int16_t rowIndex, TSKEY key, char** prevRow, TSKEY* tsCols, int32_t step) {
// TSKEY start = tsCols[rowIndex];
// TSKEY prevTs = (rowIndex == 0)? *(TSKEY *) prevRow[0] : tsCols[rowIndex - step];
//
// double v1 = 0, v2 = 0, v = 0;
// char *prevVal = (rowIndex == 0)? prevRow[srcColIndex] : ((char*)pColInfo->pData) + (rowIndex - step) * pColInfo->info.bytes;
//
// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal);
// GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + rowIndex * pColInfo->info.bytes);
//
// SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
// SPoint point2 = (SPoint){.key = start, .val = &v2};
// SPoint point = (SPoint){.key = key, .val = &v};
// taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
//
// return v;
//}
// window start key interpolation // window start key interpolation
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -1238,6 +1215,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1238,6 +1215,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
} }
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
...@@ -1249,6 +1228,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1249,6 +1228,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
} }
} }
...@@ -1289,6 +1270,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1289,6 +1270,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
} }
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
...@@ -1299,6 +1282,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1299,6 +1282,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
} }
} }
...@@ -1802,9 +1787,12 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl ...@@ -1802,9 +1787,12 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied // interval query with limit applied
int32_t numOfRes = 0; int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else { } else if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo);
numOfRes = pWindowResInfo->size;
} else { // projection query
numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); numOfRes = (int32_t)getNumOfResult(pRuntimeEnv);
// update the number of output result // update the number of output result
...@@ -4487,6 +4475,18 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc ...@@ -4487,6 +4475,18 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
} else { } else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
} }
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// TODO refactor
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery);
}
}
} }
bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
......
...@@ -111,7 +111,7 @@ if $rows != 2 then ...@@ -111,7 +111,7 @@ if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @15-08-18 00:06:00.00@ then if $data00 != @15-08-18 00:06:00.000@ then
return -1 return -1
endi endi
...@@ -219,10 +219,15 @@ if $data02 != 6 then ...@@ -219,10 +219,15 @@ if $data02 != 6 then
return -1 return -1
endi endi
sql select twa(k) from t1 where ts>'2015-8-18 00:00:00' and ts<'2015-8-18 00:00:1'
if $rows != 0 then
return -1
endi
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts asc sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts asc
sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts desc sql select twa(k),avg(k),count(1) from t1 where ts>='2015-8-18 00:00:00' and ts<='2015-8-18 00:30:00' interval(10m) order by ts desc
#todo add test case while column filte exists. #todo add test case while column filter exists.
select count(*),TWA(k) from tm0 where ts>='1970-1-1 13:43:00' and ts<='1970-1-1 13:44:10' interval(9s) #sql select count(*),TWA(k) from tm0 where ts>='1970-1-1 13:43:00' and ts<='1970-1-1 13:44:10' interval(9s)
...@@ -103,6 +103,8 @@ sleep 500 ...@@ -103,6 +103,8 @@ sleep 500
run general/parser/timestamp.sim run general/parser/timestamp.sim
sleep 500 sleep 500
run general/parser/sliding.sim run general/parser/sliding.sim
sleep 500
run general/parser/function.sim
#sleep 500 #sleep 500
#run general/parser/repeatStream.sim #run general/parser/repeatStream.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册