未验证 提交 8fce74f0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2654 from taosdata/feature/query

Feature/query
...@@ -45,6 +45,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -45,6 +45,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp; pSql->fp = fp;
pSql->fetchFp = fp;
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
...@@ -159,7 +160,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -159,7 +160,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes->code = numOfRows; pRes->code = numOfRows;
} }
tscQueueAsyncError(pSql->fetchFp, param, pRes->code); tscQueueAsyncRes(pSql);
return; return;
} }
...@@ -167,6 +168,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -167,6 +168,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
} }
tscProcessSql(pSql); tscProcessSql(pSql);
} }
...@@ -346,31 +348,32 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -346,31 +348,32 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
void tscProcessAsyncRes(SSchedMsg *pMsg) { void tscProcessAsyncRes(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
SSqlCmd *pCmd = &pSql->cmd; // SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
void *taosres = pSql; // void *taosres = pSql;
// pCmd may be released, so cache pCmd->command // pCmd may be released, so cache pCmd->command
int cmd = pCmd->command; // int cmd = pCmd->command;
int code = pRes->code; // int code = pRes->code;
// in case of async insert, restore the user specified callback function // in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldBeFreed(pSql); // bool shouldFree = tscShouldBeFreed(pSql);
if (cmd == TSDB_SQL_INSERT) { // if (pCmd->command == TSDB_SQL_INSERT) {
assert(pSql->fp != NULL); // assert(pSql->fp != NULL);
pSql->fp = pSql->fetchFp; assert(pSql->fp != NULL && pSql->fetchFp != NULL);
} // }
if (pSql->fp) { // if (pSql->fp) {
(*pSql->fp)(pSql->param, taosres, code); pSql->fp = pSql->fetchFp;
} (*pSql->fp)(pSql->param, pSql, pRes->code);
// }
if (shouldFree) { // if (shouldFree) {
tscDebug("%p sqlObj is automatically freed in async res", pSql); // tscDebug("%p sqlObj is automatically freed in async res", pSql);
tscFreeSqlObj(pSql); // tscFreeSqlObj(pSql);
} // }
} }
static void tscProcessAsyncError(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) {
...@@ -420,14 +423,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -420,14 +423,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code; pRes->code = code;
tscQueueAsyncRes(pSql);
return;
}
if (code != TSDB_CODE_SUCCESS) {
tscError("%p ge tableMeta failed, code:%s", pSql, tstrerror(code));
goto _error;
} else {
tscDebug("%p get tableMeta successfully", pSql); tscDebug("%p get tableMeta successfully", pSql);
}
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -453,11 +456,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -453,11 +456,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL); pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { // tscProcessSql can add error into async res
tscProcessSql(pSql);
return; return;
}
goto _error;
} else { // continue to process normal async query } else { // continue to process normal async query
if (pCmd->parseFinished) { if (pCmd->parseFinished) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql); tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
...@@ -481,26 +482,21 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -481,26 +482,21 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
} }
if (code == TSDB_CODE_SUCCESS) {
/* /*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks, * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server. * and send the required submit block according to index value in supporter to server.
*/ */
pSql->fp = pSql->fetchFp; // restore the fp pSql->fp = pSql->fetchFp; // restore the fp
if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) { tscHandleInsertRetry(pSql);
return;
}
}
} else {// in case of other query type, continue } else {// in case of other query type, continue
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { tscProcessSql(pSql);
return;
}
} }
goto _error; return;
} else { } else {
tscDebug("%p continue parse sql after get table meta", pSql); tscDebug("%p continue parse sql after get table meta", pSql);
...@@ -538,7 +534,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -538,7 +534,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error; goto _error;
} }
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
......
...@@ -330,10 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -330,10 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool stableQueryFunctChanged(int32_t funcId) {
return (aAggs[funcId].stableFuncId != funcId);
}
/** /**
* the numOfRes should be kept, since it may be used later * the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized * and allow the ResultInfo to be re initialized
...@@ -361,7 +357,6 @@ static bool function_setup(SQLFunctionCtx *pCtx) { ...@@ -361,7 +357,6 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
} }
memset(pCtx->aOutputBuf, 0, (size_t)pCtx->outputBytes); memset(pCtx->aOutputBuf, 0, (size_t)pCtx->outputBytes);
initResultInfo(pResInfo); initResultInfo(pResInfo);
return true; return true;
} }
...@@ -675,16 +670,16 @@ static void sum_func_second_merge(SQLFunctionCtx *pCtx) { ...@@ -675,16 +670,16 @@ static void sum_func_second_merge(SQLFunctionCtx *pCtx) {
} }
} }
static int32_t precal_req_load_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { static int32_t statisRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
return BLK_DATA_STATIS_NEEDED; return BLK_DATA_STATIS_NEEDED;
} }
static int32_t data_req_load_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
} }
// todo: if column in current data block are null, opt for this case // todo: if column in current data block are null, opt for this case
static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
...@@ -697,7 +692,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, ...@@ -697,7 +692,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
} }
} }
static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order != pCtx->param[0].i64Key) { if (pCtx->order != pCtx->param[0].i64Key) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
...@@ -709,34 +704,40 @@ static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, ...@@ -709,34 +704,40 @@ static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
} }
} }
static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
// result buffer has not been set yet. // not initialized yet, it is the first block, load it.
if (pCtx->aOutputBuf == NULL) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
//todo optimize the filter info }
// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// if (pInfo->hasResult != DATA_SET_FLAG) { SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// return BLK_DATA_ALL_NEEDED; if (pInfo->hasResult != DATA_SET_FLAG) {
// } else { // data in current block is not earlier than current result return BLK_DATA_ALL_NEEDED;
// return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; } else { // data in current block is not earlier than current result
// } return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
} }
static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order != pCtx->param[0].i64Key) { if (pCtx->order != pCtx->param[0].i64Key) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
// not initialized yet, it is the first block, load it.
if (pCtx->aOutputBuf == NULL) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); }
// if (pInfo->hasResult != DATA_SET_FLAG) {
// return BLK_DATA_ALL_NEEDED; SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// } else { if (pInfo->hasResult != DATA_SET_FLAG) {
// return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
// } } else {
return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
} }
////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////
...@@ -1549,6 +1550,8 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in ...@@ -1549,6 +1550,8 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in
* to decide if the value is earlier than current intermediate result * to decide if the value is earlier than current intermediate result
*/ */
static void first_dist_function(SQLFunctionCtx *pCtx) { static void first_dist_function(SQLFunctionCtx *pCtx) {
assert(pCtx->size > 0);
if (pCtx->size == 0) { if (pCtx->size == 0) {
return; return;
} }
...@@ -1564,6 +1567,11 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { ...@@ -1564,6 +1567,11 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
// data block is discard, not loaded, do not need to check it
if (!pCtx->preAggVals.dataBlockLoaded) {
return;
}
// find the first not null value // find the first not null value
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i); char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
...@@ -1584,10 +1592,6 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { ...@@ -1584,10 +1592,6 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
} }
static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
if (pCtx->size == 0) {
return;
}
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return; return;
...@@ -1715,10 +1719,6 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind ...@@ -1715,10 +1719,6 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind
} }
static void last_dist_function(SQLFunctionCtx *pCtx) { static void last_dist_function(SQLFunctionCtx *pCtx) {
if (pCtx->size == 0) {
return;
}
/* /*
* 1. for scan data in asc order, no need to check data * 1. for scan data in asc order, no need to check data
* 2. for data blocks that are not loaded, no need to check data * 2. for data blocks that are not loaded, no need to check data
...@@ -1727,6 +1727,11 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { ...@@ -1727,6 +1727,11 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
return; return;
} }
// data block is discard, not loaded, do not need to check it
if (!pCtx->preAggVals.dataBlockLoaded) {
return;
}
int32_t notNullElems = 0; int32_t notNullElems = 0;
for (int32_t i = pCtx->size - 1; i >= 0; --i) { for (int32_t i = pCtx->size - 1; i >= 0; --i) {
...@@ -2123,74 +2128,72 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { ...@@ -2123,74 +2128,72 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree(pData); tfree(pData);
} }
bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *minval, char *maxval) { /*
STopBotInfo *pTopBotInfo = (STopBotInfo *)GET_RES_INFO(pCtx)->interResultBuf; * Parameters values:
* 1. param[0]: maximum allowable results
* 2. param[1]: order by type (time or value)
* 3. param[2]: asc/desc order
*
* top/bottom use the intermediate result buffer to keep the intermediate result
*/
static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (STopBotInfo*) pCtx->aOutputBuf;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return pResInfo->interResultBuf;
}
}
int32_t numOfExistsRes = pTopBotInfo->num; bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval) {
STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx);
// required number of results are not reached, continue load data block // required number of results are not reached, continue load data block
if (numOfExistsRes < pCtx->param[0].i64Key) { if (pTopBotInfo->num < pCtx->param[0].i64Key) {
return true; return true;
} }
tValuePair *pRes = (tValuePair*) pTopBotInfo->res; tValuePair **pRes = (tValuePair**) pTopBotInfo->res;
if (functionId == TSDB_FUNC_TOP) { if (functionId == TSDB_FUNC_TOP) {
switch (pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
return GET_INT8_VAL(maxval) > pRes[0].v.i64Key; return GET_INT8_VAL(maxval) > pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
return GET_INT16_VAL(maxval) > pRes[0].v.i64Key; return GET_INT16_VAL(maxval) > pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
return GET_INT32_VAL(maxval) > pRes[0].v.i64Key; return GET_INT32_VAL(maxval) > pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
return GET_INT64_VAL(maxval) > pRes[0].v.i64Key; return GET_INT64_VAL(maxval) > pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
return GET_FLOAT_VAL(maxval) > pRes[0].v.dKey; return GET_FLOAT_VAL(maxval) > pRes[0]->v.dKey;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return GET_DOUBLE_VAL(maxval) > pRes[0].v.dKey; return GET_DOUBLE_VAL(maxval) > pRes[0]->v.dKey;
default: default:
return true; return true;
} }
} else { } else {
switch (pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
return GET_INT8_VAL(minval) < pRes[0].v.i64Key; return GET_INT8_VAL(minval) < pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
return GET_INT16_VAL(minval) < pRes[0].v.i64Key; return GET_INT16_VAL(minval) < pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
return GET_INT32_VAL(minval) < pRes[0].v.i64Key; return GET_INT32_VAL(minval) < pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
return GET_INT64_VAL(minval) < pRes[0].v.i64Key; return GET_INT64_VAL(minval) < pRes[0]->v.i64Key;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
return GET_FLOAT_VAL(minval) < pRes[0].v.dKey; return GET_FLOAT_VAL(minval) < pRes[0]->v.dKey;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return GET_DOUBLE_VAL(minval) < pRes[0].v.dKey; return GET_DOUBLE_VAL(minval) < pRes[0]->v.dKey;
default: default:
return true; return true;
} }
} }
} }
/*
* Parameters values:
* 1. param[0]: maximum allowable results
* 2. param[1]: order by type (time or value)
* 3. param[2]: asc/desc order
*
* top/bottom use the intermediate result buffer to keep the intermediate result
*/
static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (STopBotInfo*) pCtx->aOutputBuf;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return pResInfo->interResultBuf;
}
}
/* /*
* keep the intermediate results during scan data blocks in the format of: * keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+ * +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
...@@ -3376,7 +3379,7 @@ static void spread_function(SQLFunctionCtx *pCtx) { ...@@ -3376,7 +3379,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SSpreadInfo *pInfo = pResInfo->interResultBuf; SSpreadInfo *pInfo = pResInfo->interResultBuf;
int32_t numOfElems = pCtx->size; int32_t numOfElems = 0;
// todo : opt with pre-calculated result // todo : opt with pre-calculated result
// column missing cause the hasNull to be true // column missing cause the hasNull to be true
...@@ -4412,7 +4415,7 @@ static void sumrate_finalizer(SQLFunctionCtx *pCtx) { ...@@ -4412,7 +4415,7 @@ static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
* e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last... * e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last...
* *
*/ */
int32_t funcCompatDefList[] = { int32_t functionCompatList[] = {
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last // count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z // last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z
...@@ -4451,7 +4454,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4451,7 +4454,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
sum_func_merge, sum_func_merge,
sum_func_second_merge, sum_func_second_merge,
precal_req_load_info, statisRequired,
}, },
{ {
// 2 // 2
...@@ -4466,7 +4469,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4466,7 +4469,7 @@ SQLAggFuncElem aAggs[] = {{
avg_finalizer, avg_finalizer,
avg_func_merge, avg_func_merge,
avg_func_second_merge, avg_func_second_merge,
precal_req_load_info, statisRequired,
}, },
{ {
// 3 // 3
...@@ -4481,7 +4484,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4481,7 +4484,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
min_func_merge, min_func_merge,
min_func_second_merge, min_func_second_merge,
precal_req_load_info, statisRequired,
}, },
{ {
// 4 // 4
...@@ -4496,7 +4499,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4496,7 +4499,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
max_func_merge, max_func_merge,
max_func_second_merge, max_func_second_merge,
precal_req_load_info, statisRequired,
}, },
{ {
// 5 // 5
...@@ -4511,7 +4514,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4511,7 +4514,7 @@ SQLAggFuncElem aAggs[] = {{
stddev_finalizer, stddev_finalizer,
noop1, noop1,
noop1, noop1,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 6 // 6
...@@ -4526,7 +4529,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4526,7 +4529,7 @@ SQLAggFuncElem aAggs[] = {{
percentile_finalizer, percentile_finalizer,
noop1, noop1,
noop1, noop1,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 7 // 7
...@@ -4541,7 +4544,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4541,7 +4544,7 @@ SQLAggFuncElem aAggs[] = {{
apercentile_finalizer, apercentile_finalizer,
apercentile_func_merge, apercentile_func_merge,
apercentile_func_second_merge, apercentile_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 8 // 8
...@@ -4556,7 +4559,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4556,7 +4559,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
noop1, noop1,
noop1, noop1,
first_data_req_info, firstFuncRequired,
}, },
{ {
// 9 // 9
...@@ -4571,7 +4574,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4571,7 +4574,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
noop1, noop1,
noop1, noop1,
last_data_req_info, lastFuncRequired,
}, },
{ {
// 10 // 10
...@@ -4587,7 +4590,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4587,7 +4590,7 @@ SQLAggFuncElem aAggs[] = {{
last_row_finalizer, last_row_finalizer,
noop1, noop1,
last_dist_func_second_merge, last_dist_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 11 // 11
...@@ -4603,7 +4606,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4603,7 +4606,7 @@ SQLAggFuncElem aAggs[] = {{
top_bottom_func_finalizer, top_bottom_func_finalizer,
top_func_merge, top_func_merge,
top_func_second_merge, top_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 12 // 12
...@@ -4619,7 +4622,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4619,7 +4622,7 @@ SQLAggFuncElem aAggs[] = {{
top_bottom_func_finalizer, top_bottom_func_finalizer,
bottom_func_merge, bottom_func_merge,
bottom_func_second_merge, bottom_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 13 // 13
...@@ -4649,7 +4652,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4649,7 +4652,7 @@ SQLAggFuncElem aAggs[] = {{
twa_function_finalizer, twa_function_finalizer,
twa_func_merge, twa_func_merge,
twa_function_copy, twa_function_copy,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 15 // 15
...@@ -4664,7 +4667,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4664,7 +4667,7 @@ SQLAggFuncElem aAggs[] = {{
leastsquares_finalizer, leastsquares_finalizer,
noop1, noop1,
noop1, noop1,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 16 // 16
...@@ -4694,7 +4697,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4694,7 +4697,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 18 // 18
...@@ -4724,7 +4727,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4724,7 +4727,7 @@ SQLAggFuncElem aAggs[] = {{
ts_comp_finalize, ts_comp_finalize,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 20 // 20
...@@ -4754,7 +4757,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4754,7 +4757,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 22, multi-output, tag function has only one result // 22, multi-output, tag function has only one result
...@@ -4784,7 +4787,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4784,7 +4787,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 24 // 24
...@@ -4799,7 +4802,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4799,7 +4802,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer, doFinalizer,
noop1, noop1,
noop1, noop1,
data_req_load_info, dataBlockRequired,
}, },
// distributed version used in two-stage aggregation processes // distributed version used in two-stage aggregation processes
{ {
...@@ -4815,7 +4818,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4815,7 +4818,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
first_dist_func_merge, first_dist_func_merge,
first_dist_func_second_merge, first_dist_func_second_merge,
first_dist_data_req_info, firstDistFuncRequired,
}, },
{ {
// 26 // 26
...@@ -4830,7 +4833,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4830,7 +4833,7 @@ SQLAggFuncElem aAggs[] = {{
function_finalizer, function_finalizer,
last_dist_func_merge, last_dist_func_merge,
last_dist_func_second_merge, last_dist_func_second_merge,
last_dist_data_req_info, lastDistFuncRequired,
}, },
{ {
// 27 // 27
...@@ -4845,7 +4848,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4845,7 +4848,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer, doFinalizer,
noop1, noop1,
copy_function, copy_function,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 28 // 28
...@@ -4860,7 +4863,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4860,7 +4863,7 @@ SQLAggFuncElem aAggs[] = {{
rate_finalizer, rate_finalizer,
rate_func_merge, rate_func_merge,
rate_func_copy, rate_func_copy,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 29 // 29
...@@ -4875,7 +4878,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4875,7 +4878,7 @@ SQLAggFuncElem aAggs[] = {{
rate_finalizer, rate_finalizer,
rate_func_merge, rate_func_merge,
rate_func_copy, rate_func_copy,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 30 // 30
...@@ -4890,7 +4893,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4890,7 +4893,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer, sumrate_finalizer,
sumrate_func_merge, sumrate_func_merge,
sumrate_func_second_merge, sumrate_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 31 // 31
...@@ -4905,7 +4908,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4905,7 +4908,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer, sumrate_finalizer,
sumrate_func_merge, sumrate_func_merge,
sumrate_func_second_merge, sumrate_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 32 // 32
...@@ -4920,7 +4923,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4920,7 +4923,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer, sumrate_finalizer,
sumrate_func_merge, sumrate_func_merge,
sumrate_func_second_merge, sumrate_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 33 // 33
...@@ -4935,7 +4938,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4935,7 +4938,7 @@ SQLAggFuncElem aAggs[] = {{
sumrate_finalizer, sumrate_finalizer,
sumrate_func_merge, sumrate_func_merge,
sumrate_func_second_merge, sumrate_func_second_merge,
data_req_load_info, dataBlockRequired,
}, },
{ {
// 34 // 34
...@@ -4950,5 +4953,5 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4950,5 +4953,5 @@ SQLAggFuncElem aAggs[] = {{
noop1, noop1,
noop1, noop1,
noop1, noop1,
data_req_load_info, dataBlockRequired,
}}; }};
...@@ -2471,7 +2471,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) { ...@@ -2471,7 +2471,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
startIdx++; startIdx++;
} }
int32_t factor = funcCompatDefList[tscSqlExprGet(pQueryInfo, startIdx)->functionId]; int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId];
// diff function cannot be executed with other function // diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions // arithmetic function can be executed with other arithmetic functions
...@@ -2489,7 +2489,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) { ...@@ -2489,7 +2489,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
continue; continue;
} }
if (funcCompatDefList[functionId] != factor) { if (functionCompatList[functionId] != factor) {
return false; return false;
} }
} }
......
...@@ -339,7 +339,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -339,7 +339,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
} }
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code; rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS)? pRes->numOfRows: pRes->code;
bool shouldFree = tscShouldBeFreed(pSql); bool shouldFree = tscShouldBeFreed(pSql);
(*pSql->fp)(pSql->param, pSql, rpcMsg->code); (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
...@@ -412,7 +412,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -412,7 +412,7 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
} else if (pCmd->command < TSDB_SQL_LOCAL) { } else if (pCmd->command < TSDB_SQL_LOCAL) {
pSql->ipList = tscMgmtIpSet; //? pSql->ipList = tscMgmtIpSet;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -476,6 +476,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -476,6 +476,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex; int32_t vgIndex = pTableMetaInfo->vgroupIndex;
SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList; SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
} else { } else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
...@@ -549,6 +551,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -549,6 +551,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
assert(index >= 0); assert(index >= 0);
if (pTableMetaInfo->vgroupList->numOfVgroups > 0) { if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
} }
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups); tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
...@@ -1372,7 +1375,6 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) { ...@@ -1372,7 +1375,6 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
if (pRes->rspType == 0) { if (pRes->rspType == 0) {
pRes->numOfRows = numOfRes; pRes->numOfRows = numOfRes;
pRes->row = 0; pRes->row = 0;
......
...@@ -484,22 +484,11 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) { ...@@ -484,22 +484,11 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH) && pCmd->command == TSDB_SQL_FETCH) &&
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) { (pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s", pSql, sqlCmd[pCmd->command]); tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql); tscProcessSql(pSql);
// in case of sync model query, waits for response and then goes on
// if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) {
// sem_wait(&pSql->rspSem);
// tscFreeSqlObj(pSql);
// tscDebug("%p sqlObj is freed by app", pSql);
// } else {
tscDebug("%p sqlObj will be freed while rsp received", pSql);
// }
return true; return true;
} }
......
...@@ -1895,9 +1895,11 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { ...@@ -1895,9 +1895,11 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
assert(pSupporter->index < pSupporter->pState->numOfTotal); assert(pSupporter->index < pSupporter->pState->numOfTotal);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
pRes->code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
if (pRes->code != TSDB_CODE_SUCCESS) {
return pRes->code; if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return code; // here the pSql may have been released already.
} }
return tscProcessSql(pSql); return tscProcessSql(pSql);
......
...@@ -1648,6 +1648,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1648,6 +1648,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
} }
pNew->fp = fp; pNew->fp = fp;
pNew->fetchFp = fp;
pNew->param = param; pNew->param = param;
pNew->maxRetry = TSDB_MAX_REPLICA; pNew->maxRetry = TSDB_MAX_REPLICA;
...@@ -1803,6 +1804,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1803,6 +1804,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
pNew->fp = fp; pNew->fp = fp;
pNew->fetchFp = fp;
pNew->param = param; pNew->param = param;
pNew->maxRetry = TSDB_MAX_REPLICA; pNew->maxRetry = TSDB_MAX_REPLICA;
...@@ -2005,7 +2008,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2005,7 +2008,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
while (++pTableMetaInfo->vgroupIndex < totalVgroups) { if (++pTableMetaInfo->vgroupIndex < totalVgroups) {
tscDebug("%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%" PRId64, pSql, tscDebug("%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%" PRId64, pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfClauseTotal); pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfClauseTotal);
...@@ -2041,11 +2044,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2041,11 +2044,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
// set the callback function // set the callback function
pSql->fp = fp; pSql->fp = fp;
int32_t ret = tscProcessSql(pSql); tscProcessSql(pSql);
if (ret == TSDB_CODE_SUCCESS) { } else {
return; tscDebug("%p try all %d vnodes, query complete. current numOfRes:%" PRId64, pSql, totalVgroups, pRes->numOfClauseTotal);
} else {// todo check for failure
}
} }
} }
......
...@@ -121,6 +121,7 @@ typedef struct SQueryCostInfo { ...@@ -121,6 +121,7 @@ typedef struct SQueryCostInfo {
uint32_t loadBlockStatis; uint32_t loadBlockStatis;
uint32_t discardBlocks; uint32_t discardBlocks;
uint64_t elapsedTime; uint64_t elapsedTime;
uint64_t ioTime;
uint64_t computTime; uint64_t computTime;
} SQueryCostInfo; } SQueryCostInfo;
......
...@@ -125,7 +125,8 @@ typedef struct SArithmeticSupport { ...@@ -125,7 +125,8 @@ typedef struct SArithmeticSupport {
} SArithmeticSupport; } SArithmeticSupport;
typedef struct SQLPreAggVal { typedef struct SQLPreAggVal {
bool isSet; bool isSet; // statistics info set or not
bool dataBlockLoaded; // data block is loaded or not
SDataStatis statis; SDataStatis statis;
} SQLPreAggVal; } SQLPreAggVal;
...@@ -224,25 +225,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -224,25 +225,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0) #define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0) #define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
/*
* the status of one block, used in metric query. all blocks are mixed together,
* we need the status to decide if one block is a first/end/inter block of one meter
*/
enum {
BLK_FILE_BLOCK = 0x1,
BLK_BLOCK_LOADED = 0x2,
BLK_CACHE_BLOCK = 0x4, // in case of cache block, block must be loaded
};
/* determine the real data need to calculated the result */ /* determine the real data need to calculated the result */
enum { enum {
BLK_DATA_NO_NEEDED = 0x0, BLK_DATA_NO_NEEDED = 0x0,
BLK_DATA_STATIS_NEEDED = 0x1, BLK_DATA_STATIS_NEEDED = 0x1,
BLK_DATA_ALL_NEEDED = 0x3, BLK_DATA_ALL_NEEDED = 0x3,
BLK_DATA_DISCARD = 0x4, // discard current data block since it is not qualified for filter
}; };
#define SET_DATA_BLOCK_NOT_LOADED(x) ((x) &= (~BLK_BLOCK_LOADED));
typedef struct STwaInfo { typedef struct STwaInfo {
TSKEY lastKey; TSKEY lastKey;
int8_t hasResult; // flag to denote has value int8_t hasResult; // flag to denote has value
...@@ -264,12 +254,9 @@ typedef struct STwaInfo { ...@@ -264,12 +254,9 @@ typedef struct STwaInfo {
/* global sql function array */ /* global sql function array */
extern struct SQLAggFuncElem aAggs[]; extern struct SQLAggFuncElem aAggs[];
/* compatible check array list */ extern int32_t functionCompatList[]; // compatible check array list
extern int32_t funcCompatDefList[];
bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *minval, char *maxval);
bool stableQueryFunctChanged(int32_t funcId); bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval);
void resetResultInfo(SResultInfo *pResInfo); void resetResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf); void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
......
...@@ -1358,6 +1358,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY ...@@ -1358,6 +1358,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->preAggVals.isSet = false; pCtx->preAggVals.isSet = false;
} }
pCtx->preAggVals.dataBlockLoaded = (inputData != NULL);
// limit/offset query will affect this value // limit/offset query will affect this value
pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos:0; pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos:0;
pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
...@@ -1928,73 +1930,46 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi ...@@ -1928,73 +1930,46 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery->pSelectExpr[columnIndex].bytes * realRowId; pQuery->pSelectExpr[columnIndex].bytes * realRowId;
} }
/** #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
* decrease the refcount for each table involved in this query
* @param pQInfo
*/
UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) {
// assert(taosHashGetSize(pQInfo->tableqinfoGroupInfo) >= 1);
}
#if 0 static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx,
if (pQInfo == NULL || pQInfo->tableqinfoGroupInfo.numOfTables == 1) { int32_t numOfRows) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1); SQuery* pQuery = pRuntimeEnv->pQuery;
qDebug("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pRuntimeEnv->topBotQuery))) {
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->tableqinfoGroupInfo.numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->tableqinfoGroupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
qDebug("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
}
/*
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
*/
num = pQInfo->tableqinfoGroupInfo.numOfTables - num;
qDebug("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
pQInfo->tableqinfoGroupInfo.numOfTables, num);
}
#endif
}
static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx,
int32_t numOfTotalPoints) {
if (pDataStatis == NULL) {
return true; return true;
} }
#if 0
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
int32_t colIndex = pFilterInfo->info.colIndex;
// this column not valid in current data block int32_t index = -1;
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) { for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
continue; if (pDataStatis[i].colId == pFilterInfo->info.colId) {
index = i;
break;
}
}
// no statistics data
if (index == -1) {
return true;
} }
// not support pre-filter operation on binary/nchar data type // not support pre-filter operation on binary/nchar data type
if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) { if (!IS_PREFILTER_TYPE(pFilterInfo->info.type)) {
continue; return true;
} }
// all points in current column are NULL, no need to check its boundary value // all points in current column are NULL, no need to check its boundary value
if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) { if (pDataStatis[index].numOfNull == numOfRows) {
continue; continue;
} }
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) { SDataStatis* pDataBlockst = &pDataStatis[index];
float minval = *(double *)(&pDataStatis[colIndex].min);
float maxval = *(double *)(&pDataStatis[colIndex].max); if (pFilterInfo->info.type == TSDB_DATA_TYPE_FLOAT) {
float minval = *(double *)(&pDataBlockst->min);
float maxval = *(double *)(&pDataBlockst->max);
for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) { for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) {
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) { if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) {
...@@ -2003,53 +1978,50 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun ...@@ -2003,53 +1978,50 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
} }
} else { } else {
for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) { for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) {
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&pDataStatis[colIndex].min, if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&pDataBlockst->min, (char *)&pDataBlockst->max)) {
(char *)&pDataStatis[colIndex].max)) {
return true; return true;
} }
} }
} }
} }
// todo disable this opt code block temporarily if (pRuntimeEnv->topBotQuery) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].base.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max); return topbot_datablock_filter(&pCtx[i], functionId, (char *)&pDataStatis[i].min, (char *)&pDataStatis[i].max);
// } }
// } }
}
#endif return false;
return true;
} }
SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis) { int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
uint32_t r = 0; uint32_t status = 0;
SArray * pDataBlock = NULL;
if (pQuery->numOfFilterCols > 0) { if (pQuery->numOfFilterCols > 0) {
r = BLK_DATA_ALL_NEEDED; status = BLK_DATA_ALL_NEEDED;
} else { } else { // check if this data block is required to load
// check if this data block is required to load
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
int32_t functionId = pSqlFunc->functionId; int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId; int32_t colId = pSqlFunc->colInfo.colId;
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId); status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
} }
if (pRuntimeEnv->pTSBuf > 0 || QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pRuntimeEnv->pTSBuf > 0 || QUERY_IS_INTERVAL_QUERY(pQuery)) {
r |= BLK_DATA_ALL_NEEDED; status |= BLK_DATA_ALL_NEEDED;
} }
} }
if (r == BLK_DATA_NO_NEEDED) { if (status == BLK_DATA_NO_NEEDED) {
qDebug("QInfo:%p data block discard, rows:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->rows); qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pRuntimeEnv->summary.discardBlocks += 1; pRuntimeEnv->summary.discardBlocks += 1;
} else if (r == BLK_DATA_STATIS_NEEDED) { } else if (status == BLK_DATA_STATIS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED; // return DISK_DATA_LOAD_FAILED;
} }
...@@ -2057,32 +2029,34 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, ...@@ -2057,32 +2029,34 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
pRuntimeEnv->summary.loadBlockStatis += 1; pRuntimeEnv->summary.loadBlockStatis += 1;
if (*pStatis == NULL) { // data block statistics does not exist, load data block if (*pStatis == NULL) { // data block statistics does not exist, load data block
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows; pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
} }
} else { } else {
assert(r == BLK_DATA_ALL_NEEDED); assert(status == BLK_DATA_ALL_NEEDED);
// load the data block statistics to perform further filter // load the data block statistics to perform further filter
pRuntimeEnv->summary.loadBlockStatis +=1; pRuntimeEnv->summary.loadBlockStatis += 1;
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
} }
if (!needToLoadDataBlock(pQuery,*pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
qDebug("QInfo:%p block discarded by per-filter", GET_QINFO_ADDR(pRuntimeEnv)); qDebug("QInfo:%p block discarded by per-filter", GET_QINFO_ADDR(pRuntimeEnv));
#endif #endif
// current block has been discard due to filter applied // current block has been discard due to filter applied
pRuntimeEnv->summary.discardBlocks += 1; pRuntimeEnv->summary.discardBlocks += 1;
// return DISK_DATA_DISCARDED; qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
return BLK_DATA_DISCARD;
} }
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows; pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
pRuntimeEnv->summary.loadBlocks += 1; pRuntimeEnv->summary.loadBlocks += 1;
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
} }
return pDataBlock; return TSDB_CODE_SUCCESS;
} }
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
...@@ -2225,13 +2199,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2225,13 +2199,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->order.order); pQuery->order.order);
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -2259,7 +2233,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2259,7 +2233,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
ensureOutputBuffer(pRuntimeEnv, &blockInfo); ensureOutputBuffer(pRuntimeEnv, &blockInfo);
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); SArray *pDataBlock = NULL;
if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) {
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step;
continue;
}
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
...@@ -2282,8 +2260,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2282,8 +2260,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
// int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
closeAllTimeWindow(&pRuntimeEnv->windowResInfo); closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
// removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step); // removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
...@@ -3393,12 +3369,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) ...@@ -3393,12 +3369,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols); cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols);
} }
#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \ #define CHECK_QUERY_TIME_RANGE(_q, _tableInfo) \
do { \ do { \
SQuery *_query = (_runtime)->pQuery; \ assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_q)) || \
_query->current = _tableInfo; \ (((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_q))); \
assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_query)) || \
(((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_query))); \
} while (0) } while (0)
/** /**
...@@ -3710,7 +3684,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3710,7 +3684,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} }
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SArray *pDataBlock, __block_search_fn_t searchFn) { SArray *pDataBlock, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current; STableQueryInfo* pTableQueryInfo = pQuery->current;
...@@ -3869,9 +3843,10 @@ static void queryCostStatis(SQInfo *pQInfo) { ...@@ -3869,9 +3843,10 @@ static void queryCostStatis(SQInfo *pQInfo) {
// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0, // pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
// pSummary->skippedFileBlocks, pSummary->totalGenData); // pSummary->skippedFileBlocks, pSummary->totalGenData);
qDebug("QInfo:%p :cost summary: elpased time:%"PRId64" us, total blocks:%d, use block statis:%d, use block data:%d, " qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, io time:%"PRId64" us, total blocks:%d, load block statis:%d,"
"total rows:%"PRId64 ", check rows:%"PRId64, pQInfo, pSummary->elapsedTime, pSummary->totalBlocks, " load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); pQInfo, pSummary->elapsedTime, pSummary->ioTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
// qDebug("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk); // qDebug("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
// //
...@@ -4253,6 +4228,23 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4253,6 +4228,23 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} }
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
} else { // interval query
TSKEY nextKey = pBlockInfo->window.skey;
setIntervalQueryRange(pQInfo, nextKey);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
}
}
}
static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -4263,10 +4255,12 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4263,10 +4255,12 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -4276,24 +4270,19 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4276,24 +4270,19 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
break; break;
} }
assert(*pTableQueryInfo != NULL); pQuery->current = *pTableQueryInfo;
SET_CURRENT_QUERY_TABLE_INFO(pRuntimeEnv, *pTableQueryInfo); CHECK_QUERY_TIME_RANGE(pQuery, *pTableQueryInfo);
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
if (!pRuntimeEnv->groupbyNormalCol) { if (!pRuntimeEnv->groupbyNormalCol) {
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
}
} }
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) {
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step;
continue;
} }
summary->totalRows += blockInfo.rows; summary->totalRows += blockInfo.rows;
...@@ -4553,7 +4542,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4553,7 +4542,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -5051,6 +5039,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { ...@@ -5051,6 +5039,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol); isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
sequentialTableProcess(pQInfo); sequentialTableProcess(pQInfo);
} }
// record the total elapsed time // record the total elapsed time
...@@ -6176,11 +6165,6 @@ _over: ...@@ -6176,11 +6165,6 @@ _over:
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
*pQInfo = NULL; *pQInfo = NULL;
} else {
// SQInfo* pq = (SQInfo*) (*pQInfo);
// T_REF_INC(pq);
// T_REF_INC(pq);
} }
// if failed to add ref for all meters in this query, abort current query // if failed to add ref for all meters in this query, abort current query
......
...@@ -1802,6 +1802,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1802,6 +1802,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
// todo opt perf
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SDataStatis* st = &pHandle->statis[i]; SDataStatis* st = &pHandle->statis[i];
...@@ -1820,6 +1821,13 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1820,6 +1821,13 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
} }
// todo opt perf
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -413,15 +413,47 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -413,15 +413,47 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*data = NULL; *data = NULL;
// note: extend lifespan before dec ref count // note: extend lifespan before dec ref count
if (pCacheObj->extendLifespan) { bool inTrashCan = pNode->inTrashCan;
if (pCacheObj->extendLifespan && (!inTrashCan)) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime); uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime);
} }
bool inTrashCan = pNode->inTrashCan; if (_remove) {
__cache_wr_lock(pCacheObj);
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);
/*
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
* releasing this resources.
*
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing.
*/
if (pNode->inTrashCan) {
if (ref == 0) {
assert(pNode->pTNodeHeader->pData == pNode);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
}
} else {
if (ref > 0) {
assert(pNode->pTNodeHeader == NULL);
taosCacheMoveToTrash(pCacheObj, pNode);
} else {
taosCacheReleaseNode(pCacheObj, pNode);
}
}
__cache_unlock(pCacheObj);
} else {
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1); uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1);
// NOTE: once refcount is decrease, pNode may be free by other thread immediately. // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode); int32_t ref = T_REF_DEC(pNode);
if (inTrashCan) { if (inTrashCan) {
...@@ -435,35 +467,36 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -435,35 +467,36 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
} }
} else {
assert(pNode->pTNodeHeader == NULL);
if (_remove) { // not in trash can, but need to remove it
__cache_wr_lock(pCacheObj);
/*
* If not referenced by other users. Otherwise move this node to trashcan wait for all users
* releasing this resources.
*
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing.
*/
if (ref == 0) {
if (T_REF_VAL_GET(pNode) == 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else {
taosCacheMoveToTrash(pCacheObj, pNode);
} }
} }
__cache_unlock(pCacheObj); // else {
// } else { // extend its life time // if (_remove) { // not in trash can, but need to remove it
// if (pCacheObj->extendLifespan) { // __cache_wr_lock(pCacheObj);
// atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); //
// uDebug("cache:%s data:%p extend life time to %"PRId64 " after release", pCacheObj->name, pNode->data, pNode->expireTime); // /*
// * If not referenced by other users. Otherwise move this node to trashcan wait for all users
// * releasing this resources.
// *
// * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
// * that tries to do the same thing.
// */
// if (ref == 0) {
// if (T_REF_VAL_GET(pNode) == 0) {
// taosCacheReleaseNode(pCacheObj, pNode);
// } else {
// taosCacheMoveToTrash(pCacheObj, pNode);
// }
// } else if (ref > 0) {
// if (!pNode->inTrashCan) {
// assert(pNode->pTNodeHeader == NULL);
// taosCacheMoveToTrash(pCacheObj, pNode);
// }
// }
//
// __cache_unlock(pCacheObj);
// }
// } // }
}
}
} }
void taosCacheEmpty(SCacheObj *pCacheObj) { void taosCacheEmpty(SCacheObj *pCacheObj) {
......
...@@ -110,6 +110,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -110,6 +110,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle == NULL) { // failed to register qhandle if (handle == NULL) { // failed to register qhandle
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
qDestroyQueryInfo(pQInfo); // destroy it directly qDestroyQueryInfo(pQInfo); // destroy it directly
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void*) pQInfo, tstrerror(pRsp->code));
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo);
...@@ -125,12 +126,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -125,12 +126,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else { } else {
assert(pQInfo == NULL); assert(pQInfo == NULL);
} }
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
dnodePutItemIntoReadQueue(pVnode, *handle); dnodePutItemIntoReadQueue(pVnode, *handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
...@@ -138,12 +141,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -138,12 +141,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
} else { } else {
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, (void*) pCont); vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
qTableQuery(*handle); // do execute query qTableQuery(*handle); // do execute query
} }
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册