diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 4af929bf4146c14b180b7055b6e58a55aeb64863..6a5c2d0099f9b2d3e8640511460933bc925351ad 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -263,9 +263,11 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); bool hasMoreVnodesToTry(SSqlObj *pSql); +bool hasMoreClauseToTry(SSqlObj* pSql); + void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); -void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); +void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); int tscSetMgmtIpListFromCfg(const char *first, const char *second); void* malloc_throw(size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index be82eb64a881460d2f8dc751ce99d793dc6cb0db..1b30c4ffcaeaf04bb1eaff8b0d74895f65e91647 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -422,7 +422,6 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField int32_t bytes = pInfo->pSqlExpr->resBytes; char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; - if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { int32_t realLen = varDataLen(pData); assert(realLen <= bytes - VARSTR_HEADER_SIZE); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 6ab4eeaa8a3dd54eb61f14d75d97e34e64a2390d..9dd33e03cb21d8dfc807a8d8e1f91042621c7633 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -169,7 +169,11 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - tscProcessSql(pSql); + if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { + tscFetchDatablockFromSubquery(pSql); + } else { + tscProcessSql(pSql); + } } /* @@ -474,33 +478,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { // in case of insert, redo parsing the sql string and build new submit data block for two reasons: // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 2. vnode may need the schema information along with submit block to update its local table schema. - if (pCmd->command == TSDB_SQL_INSERT) { - tscDebug("%p redo parse sql string to build submit block", pSql); - - pCmd->parseFinished = false; - tscResetSqlCmdObj(pCmd); - - code = tsParseSql(pSql, true); - - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - return; - } else if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - /* - * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks, - * and send the required submit block according to index value in supporter to server. - */ - pSql->fp = pSql->fetchFp; // restore the fp - tscHandleInsertRetry(pSql); - } else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue + if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { tscDebug("%p redo parse sql string and proceed", pSql); - //tscDebug("before %p fp:%p, fetchFp:%p", pSql, pSql->fp, pSql->fetchFp); pCmd->parseFinished = false; tscResetSqlCmdObj(pCmd); - //tscDebug("after %p fp:%p, fetchFp:%p", pSql, pSql->fp, pSql->fetchFp); code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { @@ -509,8 +491,17 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - tscProcessSql(pSql); - } else { // in all other cases, simple retry + if (pCmd->command == TSDB_SQL_INSERT) { + /* + * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks, + * and send the required submit block according to index value in supporter to server. + */ + pSql->fp = pSql->fetchFp; // restore the fp + tscHandleInsertRetry(pSql); + } else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue + tscProcessSql(pSql); + } + }else { // in all other cases, simple retry tscProcessSql(pSql); } diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 1ec84f023ad74cc8c29654bf14f55781485b691d..262b7ab3f6fcc1d1b931371fbad4a860e3b231b5 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -1481,7 +1481,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) { // todo opt for null block static void first_function(SQLFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_DESC) { + if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) { return; } @@ -1550,28 +1550,17 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in * to decide if the value is earlier than current intermediate result */ static void first_dist_function(SQLFunctionCtx *pCtx) { - assert(pCtx->size > 0); - - if (pCtx->size == 0) { - return; - } - /* * do not to check data in the following cases: * 1. data block that are not loaded * 2. scan data files in desc order */ - if (pCtx->order == TSDB_ORDER_DESC) { + if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) { return; } int32_t notNullElems = 0; - // data block is discard, not loaded, do not need to check it - if (!pCtx->preAggVals.dataBlockLoaded) { - return; - } - // find the first not null value for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_CHAR_INDEX(pCtx, i); @@ -1655,7 +1644,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) { * least one data in this block that is not null.(TODO opt for this case) */ static void last_function(SQLFunctionCtx *pCtx) { - if (pCtx->order != pCtx->param[0].i64Key) { + if (pCtx->order != pCtx->param[0].i64Key || pCtx->preAggVals.dataBlockLoaded == false) { return; } @@ -2303,8 +2292,9 @@ static void top_func_second_merge(SQLFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { + int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType; do_top_function_add(pOutput, pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp, - pCtx->outputType, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); + type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } SET_VAL(pCtx, pInput->num, pOutput->num); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f76a1341d24beec8295011f01fe4bd5e57e3b0e8..f9b12f3f6d27b53cbbe832739cd5c2dd0326b8e4 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -485,7 +485,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } case TSDB_SQL_SELECT: { - assert(pCmd->numOfClause == 1); const char* msg1 = "columns in select clause not identical"; for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { @@ -496,16 +495,19 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause); - for (int32_t i = 0; i < pInfo->subclauseInfo.numOfClause; ++i) { + for (int32_t i = pCmd->clauseIndex; i < pInfo->subclauseInfo.numOfClause; ++i) { SQuerySQL* pQuerySql = pInfo->subclauseInfo.pClause[i]; - + tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, pInfo->subclauseInfo.numOfClause); if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) { return code; } tscPrintSelectClause(pSql, i); + pCmd->clauseIndex += 1; } + // restore the clause index + pCmd->clauseIndex = 0; // set the command/global limit parameters from the first subclause to the sqlcmd object SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0); pCmd->command = pQueryInfo1->command; @@ -1385,6 +1387,11 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum return numOfTotalColumns; } +static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { + SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnListInsert(pQueryInfo->colList, &tsCol); +} + int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExprItem* pItem) { const char* msg0 = "invalid column name"; const char* msg1 = "tag for normal table query is not allowed"; @@ -1427,6 +1434,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t addProjectQueryCol(pQueryInfo, startPos, &index, pItem); } + + tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); } else { return TSDB_CODE_TSC_INVALID_SQL; } @@ -1499,8 +1508,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col switch (optr) { case TK_COUNT: { - if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) { /* more than one parameter for count() function */ + if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -1551,11 +1560,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } } else { // count(*) is equalled to count(primary_timestamp_key) index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); } - + + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1); @@ -1570,9 +1580,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } // the time stamp may be always needed - if (index.tableIndex > 0 && index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { - SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnListInsert(pQueryInfo->colList, &tsCol); + if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); } return TSDB_CODE_SUCCESS; @@ -1682,10 +1691,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); } } - - SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnListInsert(pQueryInfo->colList, &tsCol); - + + tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); return TSDB_CODE_SUCCESS; } case TK_FIRST: @@ -5862,6 +5869,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); } + assert(pCmd->clauseIndex == index); + // too many result columns not support order by in query if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8); @@ -5975,12 +5984,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; } } else { // set the time rang - pQueryInfo->window.skey = TSKEY_INITIAL_VAL; - pQueryInfo->window.ekey = INT64_MAX; + pQueryInfo->window = TSWINDOW_INITIALIZER; } // user does not specified the query time window, twa is not allowed in such case. - if ((pQueryInfo->window.skey == 0 || pQueryInfo->window.ekey == INT64_MAX || + if ((pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX || (pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d8af6d5c877018254af1aab9943d3d3fe0431b18..99ac2249e04868277404a962d6655e9a6d51e66f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -475,7 +475,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); pRetrieveMsg->free = htons(pQueryInfo->type); // todo valid the vgroupId at the client side diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0677463d8d01df899a377c7390a22743aa9b22ed..d5160cd3c6d45403e41123c5946b1f53e6ca7b2e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -424,7 +424,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } // current data set are exhausted, fetch more data from node - if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) && + if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index dd4ed991ed9845b5fdabc3a0e07727bfcd1756ef..faa4a2488a36a9daa3e06dd6160d8c72b0683bca 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -848,13 +848,14 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SSqlRes* pRes = &pSql->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - // TODO put to async res? if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(numOfRows == taos_errno(pSql)); pParentSql->res.code = numOfRows; tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows)); + + tscQueueAsyncRes(pParentSql); + return; } if (numOfRows >= 0) { @@ -941,31 +942,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { SSqlRes *pRes = &pSub->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); -// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - -// if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && -// (!tscHasReachLimitation(pQueryInfo, pRes)) && !pRes->completed) { -// numOfFetch++; -// } -// } else { - if (!tscHasReachLimitation(pQueryInfo, pRes)) { - if (pRes->row >= pRes->numOfRows) { - hasData = false; - if (!pRes->completed) { - numOfFetch++; - } - } - } else { // has reach the limitation, no data anymore - if (pRes->row >= pRes->numOfRows) { - hasData = false; - break; + if (!tscHasReachLimitation(pQueryInfo, pRes)) { + if (pRes->row >= pRes->numOfRows) { + hasData = false; + + if (!pRes->completed) { + numOfFetch++; } } - + } else { // has reach the limitation, no data anymore + if (pRes->row >= pRes->numOfRows) { + hasData = false; + break; + } } -// } + } // has data remains in client side, and continue to return data to app if (hasData) { @@ -1026,7 +1018,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - tscDebug("%p all subquery response, retrieve data", pSql); + tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex); // the column transfer support struct has been built if (pRes->pColumnIndex != NULL) { @@ -1195,8 +1187,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter pNew->cmd.numOfCols = 0; pNewQueryInfo->intervalTime = 0; - memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal)); - + pSupporter->limit = pNewQueryInfo->limit; + + pNewQueryInfo->limit.limit = -1; + pNewQueryInfo->limit.offset = 0; + // backup the data and clear it in the sqlcmd object pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); @@ -1307,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { break; } } - + pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE; return TSDB_CODE_SUCCESS; @@ -1982,88 +1977,119 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } +static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, columnIndex); + assert(pInfo->pSqlExpr != NULL); + + *bytes = pInfo->pSqlExpr->resBytes; + char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows; + + return pData; +} + +static void doBuildResFromSubqueries(SSqlObj* pSql) { + SSqlRes* pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + + int32_t numOfRes = INT32_MAX; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + continue; + } + + numOfRes = MIN(numOfRes, pSql->pSubs[i]->res.numOfRows); + } + + int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList); + pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize); + pRes->data = pRes->pRsp; + + char* data = pRes->data; + int16_t bytes = 0; + + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + for(int32_t i = 0; i < numOfExprs; ++i) { + SColumnIndex* pIndex = &pRes->pColumnIndex[i]; + SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; + SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd; + + char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes); + memcpy(data, pData, bytes * numOfRes); + + data += bytes * numOfRes; + pRes1->row = numOfRes; + } + + pRes->numOfRows = numOfRes; + pRes->numOfClauseTotal += numOfRes; +} + void tscBuildResFromSubqueries(SSqlObj *pSql) { - SSqlRes *pRes = &pSql->res; - + SSqlRes* pRes = &pSql->res; + if (pRes->code != TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return; } - - while (1) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + + if (pRes->tsrow == NULL) { + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - - if (pRes->tsrow == NULL) { - pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); - pRes->length = calloc(numOfExprs, sizeof(int32_t)); - } - - bool success = false; - - int32_t numOfTableHasRes = 0; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] != NULL) { - numOfTableHasRes++; - } - } - - if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && (doSetResultRowData(pSql->pSubs[1], false) != NULL); - } else { // only one subquery - SSqlObj *pSub = pSql->pSubs[0]; - if (pSub == NULL) { - pSub = pSql->pSubs[1]; - } - - success = (doSetResultRowData(pSub, false) != NULL); + pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); + pRes->buffer = calloc(numOfExprs, POINTER_BYTES); + pRes->length = calloc(numOfExprs, sizeof(int32_t)); + + tscRestoreSQLFuncForSTableQuery(pQueryInfo); + } + + while (1) { + if (pRes->row < pRes->numOfRows) { + assert(0); } - - if (success) { // current row of final output has been built, return to app - for (int32_t i = 0; i < numOfExprs; ++i) { - SColumnIndex* pIndex = &pRes->pColumnIndex[i]; - SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; - pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex]; - pRes->length[i] = pRes1->length[pIndex->columnIndex]; - } - - pRes->numOfClauseTotal++; - break; - } else { // continue retrieve data from vnode - if (!tscHasRemainDataInSubqueryResultSet(pSql)) { - tscDebug("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); - SSubqueryState *pState = NULL; - - // free all sub sqlobj - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj *pChildObj = pSql->pSubs[i]; - if (pChildObj == NULL) { - continue; - } - - SJoinSupporter *pSupporter = (SJoinSupporter *)pChildObj->param; - pState = pSupporter->pState; - - tscDestroyJoinSupporter(pChildObj->param); - taos_free_result(pChildObj); - } - - free(pState); - - pRes->completed = true; // set query completed - sem_post(&pSql->rspSem); - return; - } - - tscFetchDatablockFromSubquery(pSql); - if (pRes->code != TSDB_CODE_SUCCESS) { - return; - } + + doBuildResFromSubqueries(pSql); + sem_post(&pSql->rspSem); + + return; + + // continue retrieve data from vnode +// if (!tscHasRemainDataInSubqueryResultSet(pSql)) { +// tscDebug("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); +// SSubqueryState* pState = NULL; +// +// // free all sub sqlobj +// for (int32_t i = 0; i < pSql->numOfSubs; ++i) { +// SSqlObj* pChildObj = pSql->pSubs[i]; +// if (pChildObj == NULL) { +// continue; +// } +// +// SJoinSupporter* pSupporter = (SJoinSupporter*)pChildObj->param; +// pState = pSupporter->pState; +// +// tscDestroyJoinSupporter(pChildObj->param); +// taos_free_result(pChildObj); +// } +// +// free(pState); +// +// pRes->completed = true; // set query completed +// sem_post(&pSql->rspSem); +// return; +// } + + tscFetchDatablockFromSubquery(pSql); + if (pRes->code != TSDB_CODE_SUCCESS) { + return; } } - + if (pSql->res.code == TSDB_CODE_SUCCESS) { - (*pSql->fp)(pSql->param, pSql, 0); + (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); } else { tscQueueAsyncRes(pSql); } @@ -2117,14 +2143,6 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); - if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { - if (pRes->completed) { - tfree(pRes->tsrow); - } - - return pRes->tsrow; - } - if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker tfree(pRes->tsrow); return pRes->tsrow; @@ -2182,7 +2200,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { return pRes->tsrow; } -static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { +static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3e0fe0b4bec2c392c48fcbec84508ec5b1d3303a..49d15f6499c945a84e230504668731743071e27b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1994,6 +1994,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); } +bool hasMoreClauseToTry(SSqlObj* pSql) { + return pSql->cmd.clauseIndex < pSql->cmd.numOfClause - 1; +} + void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -2050,7 +2054,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { } } -void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { +void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -2070,17 +2074,13 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { tfree(pSql->pSubs); pSql->numOfSubs = 0; - - if (pSql->fp != NULL) { - pSql->fp = queryFp; - assert(queryFp != NULL); - } + pSql->fp = fp; tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); } else { - tscProcessSql(pSql); + tscDoQuery(pSql); } } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index f77036ef9d75d8297534a3522a1988d685de144e..fb8750323f9a8f868516ef43b6d3ff96d08c646c 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -121,7 +121,6 @@ typedef struct SQueryCostInfo { uint32_t loadBlockStatis; uint32_t discardBlocks; uint64_t elapsedTime; - uint64_t ioTime; uint64_t computTime; } SQueryCostInfo; @@ -201,7 +200,7 @@ typedef struct SQInfo { */ int32_t tableIndex; int32_t numOfGroupResultPages; - void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; + void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; } SQInfo; diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 8fe3c0f4953fe02e5de6c4bced125d164141f086..78ae7be03046cbe2636a8c456708b2b5ff4fc089 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -23,7 +23,7 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, con int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size, int32_t threshold, int16_t type); -void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, int32_t numOfCols); +void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo); void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); @@ -32,14 +32,29 @@ int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); -SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot); + +static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { + assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); + return &pWindowResInfo->pResult[slot]; +} #define curTimeWindow(_winres) ((_winres)->curIndex) +#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) + bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); -int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize); +int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize); + +static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) { + assert(pResult != NULL && pRuntimeEnv != NULL); + + SQuery *pQuery = pRuntimeEnv->pQuery; + tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery); -char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult); + return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + + pQuery->pSelectExpr[columnIndex].bytes * realRowId; +} __filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type); diff --git a/src/query/inc/qresultBuf.h b/src/query/inc/qresultBuf.h index ad01555c2867b02b8365b50335930b81debd7130..a323d530a27ada5eabf9c4316281ee01d782c090 100644 --- a/src/query/inc/qresultBuf.h +++ b/src/query/inc/qresultBuf.h @@ -22,26 +22,22 @@ extern "C" { #include "os.h" #include "qextbuffer.h" +#include "hash.h" -typedef struct SIDList { - uint32_t alloc; - int32_t size; - int32_t* pData; -} SIDList; +typedef struct SArray* SIDList; typedef struct SDiskbasedResultBuf { - int32_t numOfRowsPerPage; - int32_t numOfPages; - int64_t totalBufSize; - int32_t fd; // data file fd - int32_t allocateId; // allocated page id - int32_t incStep; // minimum allocated pages - char* pBuf; // mmap buffer pointer - char* path; // file path + int32_t numOfRowsPerPage; + int32_t numOfPages; + int64_t totalBufSize; + int32_t fd; // data file fd + int32_t allocateId; // allocated page id + int32_t incStep; // minimum allocated pages + char* pBuf; // mmap buffer pointer + char* path; // file path - uint32_t numOfAllocGroupIds; // number of allocated id list - void* idsTable; // id hash table - SIDList* list; // for each id, there is a page id list + SHashObj* idsTable; // id hash table + SIDList list; // for each id, there is a page id list } SDiskbasedResultBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5) @@ -112,7 +108,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle); * @param pList * @return */ -int32_t getLastPageId(SIDList *pList); +int32_t getLastPageId(SIDList pList); #ifdef __cplusplus } diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index e57cb2645666af6887dc3253a76ce603314afbdf..c687f01cbcfa0a0b81209afcf71ec44f8c8c43af 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -132,13 +132,10 @@ typedef struct SQLPreAggVal { typedef struct SInterpInfoDetail { TSKEY ts; // interp specified timestamp - int8_t hasResult; int8_t type; int8_t primaryCol; } SInterpInfoDetail; -typedef struct SInterpInfo { SInterpInfoDetail *pInterpDetail; } SInterpInfo; - typedef struct SResultInfo { int8_t hasResult; // result generated, not NULL value bool initialized; // output buffer has been initialized @@ -146,7 +143,7 @@ typedef struct SResultInfo { bool superTableQ; // is super table query int32_t numOfRes; // num of output result in current buffer int32_t bufLen; // buffer size - void * interResultBuf; // output result buffer + void* interResultBuf; // output result buffer } SResultInfo; struct SQLFunctionCtx; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 77e7321d3d96ee77e455ede94b9ee8057b6f4312..52154c1ca1e0e088540d94ebc6f8bdf2dcad3e18 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -26,7 +26,6 @@ #include "query.h" #include "queryLog.h" #include "qast.h" -#include "tfile.h" #include "tlosertree.h" #include "tscompression.h" #include "ttime.h" @@ -35,8 +34,8 @@ * check if the primary column is load by default, otherwise, the program will * forced to load primary column explicitly. */ -#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) -#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) +#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) +#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) @@ -144,7 +143,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, SDataStatis *pStatis, void *param, int32_t colIndex); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); -static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); +static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); static void buildTagQueryResult(SQInfo *pQInfo); @@ -361,17 +360,17 @@ static bool hasTagValOutput(SQuery* pQuery) { * @return */ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) { - if (TSDB_COL_IS_TAG(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return false; - } - - if (pStatis != NULL) { + if (pStatis != NULL && !TSDB_COL_IS_TAG(pColIndex->flag)) { *pColStatis = &pStatis[pColIndex->colIndex]; assert((*pColStatis)->colId == pColIndex->colId); } else { *pColStatis = NULL; } + if (TSDB_COL_IS_TAG(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return false; + } + if ((*pColStatis) != NULL && (*pColStatis)->numOfNull == 0) { return false; } @@ -387,31 +386,33 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin if (p1 != NULL) { pWindowResInfo->curIndex = *p1; } else { - if (masterscan) { // more than the capacity, reallocate the resources - if (pWindowResInfo->size >= pWindowResInfo->capacity) { - int64_t newCap = pWindowResInfo->capacity * 2; - - char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult)); - if (t != NULL) { - pWindowResInfo->pResult = (SWindowResult *)t; - memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * pWindowResInfo->capacity); - } else { - // todo - } + if (!masterscan) { // not master scan, do not add new timewindow + return NULL; + } - for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { - SPosInfo pos = {-1, -1}; - createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos, pRuntimeEnv->interBufSize); - } - pWindowResInfo->capacity = newCap; + // more than the capacity, reallocate the resources + if (pWindowResInfo->size >= pWindowResInfo->capacity) { + int64_t newCap = pWindowResInfo->capacity * 1.5; + char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult)); + if (t != NULL) { + pWindowResInfo->pResult = (SWindowResult *)t; + + int32_t inc = newCap - pWindowResInfo->capacity; + memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc); + } else { + // todo } - // add a new result set for a new group - pWindowResInfo->curIndex = pWindowResInfo->size++; - taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); - } else { - return NULL; + for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { + createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); + } + + pWindowResInfo->capacity = newCap; } + + // add a new result set for a new group + pWindowResInfo->curIndex = pWindowResInfo->size++; + taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); } return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); @@ -470,10 +471,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult int32_t pageId = -1; SIDList list = getDataBufPagesIdList(pResultBuf, sid); - if (list.size == 0) { + if (taosArrayGetSize(list) == 0) { pData = getNewDataBuf(pResultBuf, sid, &pageId); } else { - pageId = getLastPageId(&list); + pageId = getLastPageId(list); pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, pageId); if (pData->num >= numOfRowsPerPage) { @@ -511,10 +512,11 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes } *newWind = true; + // not assign result buffer yet, add new result buffer if (pWindowRes->pos.pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage); - if (ret != 0) { + if (ret != TSDB_CODE_SUCCESS) { return -1; } } @@ -531,7 +533,7 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int return &pWindowResInfo->pResult[slot].status; } -static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, +static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, int16_t order, int64_t *pData) { int32_t forwardStep = 0; @@ -647,12 +649,8 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo if (QUERY_IS_ASC_QUERY(pQuery)) { if (ekey < pDataBlockInfo->window.ekey) { num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); - if (num == 0) { // no qualified data in current block, do not update the lastKey value - assert(ekey < pPrimaryColumn[startPos]); - } else { - if (updateLastKey) { - item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; - } + if (updateLastKey) { // update the last key + item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; } } else { num = pDataBlockInfo->rows - startPos; @@ -663,12 +661,8 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo } else { // desc if (ekey > pDataBlockInfo->window.skey) { num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); - if (num == 0) { // no qualified data in current block, do not update the lastKey value - assert(ekey > pPrimaryColumn[startPos]); - } else { - if (updateLastKey) { - item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; - } + if (updateLastKey) { // update the last key + item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; } } else { num = startPos + 1; @@ -912,13 +906,20 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (QUERY_IS_INTERVAL_QUERY(pQuery) && tsCols != NULL) { - int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); - TSKEY ts = tsCols[offset]; + if (QUERY_IS_INTERVAL_QUERY(pQuery)/* && tsCols != NULL*/) { + TSKEY ts = TSKEY_INITIAL_VAL; - bool hasTimeWindow = false; + if (tsCols == NULL) { + ts = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.skey:pDataBlockInfo->window.ekey; + } else { + int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); + ts = tsCols[offset]; + } + + bool hasTimeWindow = false; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) != + TSDB_CODE_SUCCESS) { tfree(sasArray); return; } @@ -927,7 +928,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t startPos = pQuery->pos; if (hasTimeWindow) { - TSKEY ekey = reviseWindowEkey(pQuery, &win); + TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); @@ -946,7 +947,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, + &hasTimeWindow) != TSDB_CODE_SUCCESS) { break; } @@ -957,7 +959,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); - SWindowStatus* pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } @@ -1478,7 +1480,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; - pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo)); + size_t size = pRuntimeEnv->interBufSize + pQuery->numOfOutput * sizeof(SResultInfo); + + pRuntimeEnv->resultInfo = calloc(1, size); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) { @@ -1549,7 +1553,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order } } - char* buf = calloc(1, pRuntimeEnv->interBufSize); + char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultInfo) * pQuery->numOfOutput; // set the intermediate result output buffer setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf); @@ -1578,7 +1582,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p teardown runtime env", pQInfo); - cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput); + cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -1592,7 +1596,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pCtx->tagInfo.pTagCtxList); } - tfree(pRuntimeEnv->resultInfo[0].interResultBuf); tfree(pRuntimeEnv->resultInfo); tfree(pRuntimeEnv->pCtx); } @@ -1608,7 +1611,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) -static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } +static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { SQuery* pQuery = pRuntimeEnv->pQuery; @@ -1912,24 +1915,11 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { return num; } -#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) - static FORCE_INLINE int32_t getNumOfRowsInResultPage(SQuery *pQuery, bool topBotQuery, bool isSTableQuery) { int32_t rowSize = pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, topBotQuery, isSTableQuery); return (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize; } -char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) { - assert(pResult != NULL && pRuntimeEnv != NULL); - - SQuery *pQuery = pRuntimeEnv->pQuery; - tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId); - int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery); - - return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + - pQuery->pSelectExpr[columnIndex].bytes * realRowId; -} - #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, @@ -1997,23 +1987,80 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat return false; } +#define PT_IN_WINDOW(_p, _w) ((_p) > (_w).skey && (_p) < (_w).ekey) + +static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { + STimeWindow w = {0}; + + TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey); + TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey); + + + if (QUERY_IS_ASC_QUERY(pQuery)) { + getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, sk, ek, &w); + + if (PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { + return true; + } + + while(1) { + GET_NEXT_TIMEWINDOW(pQuery, &w); + if (w.skey > pBlockInfo->window.skey) { + break; + } + + if (PT_IN_WINDOW(w.skey, pBlockInfo->window) || PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { + return true; + } + } + } else { + getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, sk, ek, &w); + if (PT_IN_WINDOW(w.skey, pBlockInfo->window)) { + return true; + } + + while(1) { + GET_NEXT_TIMEWINDOW(pQuery, &w); + if (w.ekey < pBlockInfo->window.skey) { + break; + } + + if (PT_IN_WINDOW(w.skey, pBlockInfo->window) || PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { + return true; + } + } + } + + return false; +} + int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; uint32_t status = 0; - if (pQuery->numOfFilterCols > 0) { + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) { status = BLK_DATA_ALL_NEEDED; } else { // check if this data block is required to load - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; - int32_t functionId = pSqlFunc->functionId; - int32_t colId = pSqlFunc->colInfo.colId; - status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); + // Calculate all time windows that are overlapping or contain current data block. + // If current data block is contained by all possible time window, loading current + // data block is not needed. + if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) { + status = BLK_DATA_ALL_NEEDED; } - if (pRuntimeEnv->pTSBuf > 0 || QUERY_IS_INTERVAL_QUERY(pQuery)) { - status |= BLK_DATA_ALL_NEEDED; + if (status != BLK_DATA_ALL_NEEDED) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; + + int32_t functionId = pSqlFunc->functionId; + int32_t colId = pSqlFunc->colInfo.colId; + + status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); + if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + break; + } + } } } @@ -2490,11 +2537,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim switch (pQuery->pSelectExpr[i].type) { case TSDB_DATA_TYPE_BINARY: { -// int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex; int32_t type = pQuery->pSelectExpr[i].type; -// } else { -// type = pMeterObj->schema[colIndex].type; -// } printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j, type); break; @@ -2617,16 +2660,19 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); int32_t total = 0; - for (int32_t i = 0; i < list.size; ++i) { - tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, list.pData[i]); + int32_t size = taosArrayGetSize(list); + for (int32_t i = 0; i < size; ++i) { + int32_t* pgId = taosArrayGet(list, i); + tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId); total += pData->num; } int32_t rows = total; int32_t offset = 0; - for (int32_t num = 0; num < list.size; ++num) { - tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, list.pData[num]); + for (int32_t j = 0; j < size; ++j) { + int32_t* pgId = taosArrayGet(list, j); + tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; @@ -2692,7 +2738,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { STableQueryInfo *item = taosArrayGetP(pGroup, i); SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid); - if (list.size > 0 && item->windowResInfo.size > 0) { + if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) { pTableList[numOfTables] = item; numOfTables += 1; } @@ -2960,19 +3006,18 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { } } -int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize) { +int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize) { int32_t numOfCols = pQuery->numOfOutput; - pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo)); + size_t size = numOfCols * sizeof(SResultInfo) + interBufSize; + pResultRow->resultInfo = calloc(1, size); if (pResultRow->resultInfo == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pResultRow->pos = *posInfo; - char* buf = calloc(1, interBufSize); - if (buf == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } + pResultRow->pos = (SPosInfo) {-1, -1}; + + char* buf = (char*) pResultRow->resultInfo + numOfCols * sizeof(SResultInfo); // set the intermediate result output buffer setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery, buf); @@ -3373,12 +3418,12 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void return pTableQueryInfo; } -void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) { +void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; } - cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols); + cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo); } #define CHECK_QUERY_TIME_RANGE(_q, _tableInfo) \ @@ -3837,45 +3882,11 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int static void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryCostInfo *pSummary = &pRuntimeEnv->summary; -// if (pRuntimeEnv->pResultBuf == NULL) { -//// pSummary->tmpBufferInDisk = 0; -// } else { -//// pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf); -// } -// -// qDebug("QInfo:%p cost: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo, -// pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0); -// -// qDebug("QInfo:%p cost: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo, -// pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField, -// pSummary->loadFieldUs / 1000.0); -// -// qDebug( -// "QInfo:%p cost: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes", -// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0, -// pSummary->skippedFileBlocks, pSummary->totalGenData); - - qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, io time:%"PRId64" us, total blocks:%d, load block statis:%d," + + qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, total blocks:%d, load block statis:%d," " load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, - pQInfo, pSummary->elapsedTime, pSummary->ioTime, pSummary->totalBlocks, pSummary->loadBlockStatis, + pQInfo, pSummary->elapsedTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); - -// qDebug("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk); -// -// qDebug("QInfo:%p cost: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables); -// qDebug("QInfo:%p cost: seek ops:%d", pQInfo, pSummary->numOfSeek); -// -// double total = pSummary->fileTimeUs + pSummary->cacheTimeUs; -// double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs; - -// double computing = total - io; -// -// qDebug( -// "QInfo:%p cost: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%)," -// "comput:%.2fms(%.2f%)", -// pQInfo, total / 1000.0, pSummary->fileTimeUs / 1000.0, pSummary->fileTimeUs * 100 / total, -// pSummary->cacheTimeUs / 1000.0, pSummary->cacheTimeUs * 100 / total, io / 1000.0, io * 100 / total, -// computing / 1000.0, computing * 100 / total); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { @@ -4189,14 +4200,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->topBotQuery, isSTableQuery); - if (isSTableQuery) { + if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { int32_t rows = getInitialPageNum(pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - if (pQuery->intervalTime == 0) { + if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { int16_t type = TSDB_DATA_TYPE_NULL; if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags; @@ -4317,7 +4328,6 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { SDataStatis *pStatis = NULL; SArray *pDataBlock = NULL; - if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) { pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step; continue; @@ -4452,7 +4462,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (pRuntimeEnv->pQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } - + initCtxOutputBuf(pRuntimeEnv); SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); @@ -4603,7 +4613,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // TODO handle the limit offset problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { - // skipBlocks(pRuntimeEnv); if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { pQInfo->tableIndex++; continue; @@ -4717,7 +4726,7 @@ static void doSaveContext(SQInfo *pQInfo) { if (pRuntimeEnv->pSecQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } - + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pQInfo); @@ -4740,8 +4749,6 @@ static void doRestoreContext(SQInfo *pQInfo) { static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; -// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo); for (int32_t i = 0; i < numOfGroup; ++i) { @@ -4751,7 +4758,6 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { for (int32_t j = 0; j < num; ++j) { STableQueryInfo* item = taosArrayGetP(group, j); closeAllTimeWindow(&item->windowResInfo); -// removeRedundantWindow(&item->windowResInfo, item->lastKey - step, step); } } } else { // close results for group result @@ -4844,7 +4850,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - if (!isTopBottomQuery(pQuery) && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. + if (!pRuntimeEnv->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. return; } @@ -5684,6 +5690,20 @@ static int compareTableIdInfo(const void* a, const void* b) { static void freeQInfo(SQInfo *pQInfo); +static void calResultBufSize(SQuery* pQuery) { + const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes + const int32_t RESULT_MSG_MIN_ROWS = 8192; + const float RESULT_THRESHOLD_RATIO = 0.85; + + int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize; + if (numOfRes < RESULT_MSG_MIN_ROWS) { + numOfRes = RESULT_MSG_MIN_ROWS; + } + + pQuery->rec.capacity = numOfRes; + pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO; +} + static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) { int16_t numOfCols = pQueryMsg->numOfCols; @@ -5717,8 +5737,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->tagColList = pTagCols; - - // todo do not allocate ?? + pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQuery->colList == NULL) { goto _cleanup; @@ -5748,9 +5767,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, goto _cleanup; } - // set the output buffer capacity - pQuery->rec.capacity = 4096; - pQuery->rec.threshold = 4000; + calResultBufSize(pQuery); for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { assert(pExprs[col].interBytes >= pExprs[col].bytes); @@ -5799,7 +5816,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, if (p1 == NULL) { goto _cleanup; } - taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); for(int32_t j = 0; j < s; ++j) { void* pTable = taosArrayGetP(pa, j); @@ -5822,6 +5838,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES); index += 1; } + + taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); } pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); @@ -5981,9 +5999,7 @@ static void freeQInfo(SQInfo *pQInfo) { size_t num = taosArrayGetSize(p); for(int32_t j = 0; j < num; ++j) { STableQueryInfo* item = taosArrayGetP(p, j); - if (item != NULL) { - destroyTableQueryInfo(item, pQuery->numOfOutput); - } + destroyTableQueryInfo(item); } taosArrayDestroy(p); @@ -6013,7 +6029,6 @@ static void freeQInfo(SQInfo *pQInfo) { } tfree(pQuery->sdata); - tfree(pQuery); qDebug("QInfo:%p QInfo is freed", pQInfo); @@ -6168,9 +6183,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex, numOfGroupByCols); if (code != TSDB_CODE_SUCCESS) { - if (code == TSDB_CODE_QRY_EXCEED_TAGS_LIMIT) { - qError("qmsg:%p failed to QueryStable, reason: %s", pQueryMsg, tstrerror(code)); - } + qError("qmsg:%p failed to QueryStable, reason: %s", pQueryMsg, tstrerror(code)); goto _over; } } else { @@ -6330,7 +6343,6 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { } if (ret) { -// T_REF_INC(pQInfo); qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index acdc46fcc1dc867a2b391326da8fa814722c9282..4e71de830f31e4a517a78f86ddbcc71d21e2922c 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -53,9 +53,9 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun if (pWindowResInfo->pResult == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } + for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { - SPosInfo posInfo = {-1, -1}; - int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo, pRuntimeEnv->interBufSize); + int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -64,16 +64,15 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun return TSDB_CODE_SUCCESS; } -void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { +void destroyTimeWindowRes(SWindowResult *pWindowRes) { if (pWindowRes == NULL) { return; } - free(pWindowRes->resultInfo[0].interResultBuf); free(pWindowRes->resultInfo); } -void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) { +void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { if (pWindowResInfo == NULL) { return; } @@ -84,8 +83,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) { if (pWindowResInfo->pResult != NULL) { for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; - destroyTimeWindowRes(pResult, numOfCols); + destroyTimeWindowRes(&pWindowResInfo->pResult[i]); } } @@ -225,11 +223,6 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_ } } -SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { - assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); - return &pWindowResInfo->pResult[slot]; -} - bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { return (getWindowResult(pWindowResInfo, slot)->status.closed == true); } diff --git a/src/query/src/qast.c b/src/query/src/qast.c index c5f4a9ce6cc082a168266be1bed8f397fe0c7cf8..da4eb8f3baf67639615ec86b1f0b7b44269219e6 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -1064,10 +1064,9 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { if (*e == TS_PATH_DELIMITER[0]) { cond = e + 1; } else if (*e == ',') { - size_t len = e - cond + VARSTR_HEADER_SIZE; - char* p = exception_malloc(len); - varDataSetLen(p, len - VARSTR_HEADER_SIZE); - memcpy(varDataVal(p), cond, len); + size_t len = e - cond; + char* p = exception_malloc(len + VARSTR_HEADER_SIZE); + STR_WITH_SIZE_TO_VARSTR(p, cond, len); cond += len; taosArrayPush(pVal->arr, &p); } diff --git a/src/query/src/qresultBuf.c b/src/query/src/qresultBuf.c index f63f072b0b24254ba19a8624b88f05e4fa061e6f..ae1a95179bfd6478dfcb1109edf7d3f36aac97de 100644 --- a/src/query/src/qresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -2,7 +2,6 @@ #include "hash.h" #include "qextbuffer.h" #include "taoserror.h" -#include "tsqlfunction.h" #include "queryLog.h" int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) { @@ -20,35 +19,31 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si // init id hash table pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); - pResBuf->list = calloc(size, sizeof(SIDList)); - pResBuf->numOfAllocGroupIds = size; + pResBuf->list = taosArrayInit(size, POINTER_BYTES); char path[4096] = {0}; - getTmpfilePath("tsdb_q_buf", path); + getTmpfilePath("tsdb_qbuf", path); pResBuf->path = strdup(path); pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666); - - memset(path, 0, tListLen(path)); - if (!FD_VALID(pResBuf->fd)) { qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); - return TSDB_CODE_QRY_NO_DISKSPACE; + return TAOS_SYSTEM_ERROR(errno); } int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); if (ret != TSDB_CODE_SUCCESS) { qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); - return TSDB_CODE_QRY_NO_DISKSPACE; + return TAOS_SYSTEM_ERROR(errno); } pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0); if (pResBuf->pBuf == MAP_FAILED) { qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno)); - return TSDB_CODE_QRY_OUT_OF_MEMORY; // todo change error code + return TAOS_SYSTEM_ERROR(errno); } - qDebug("QInfo:%p create tmp file for output result, %s, %" PRId64 "bytes", handle, pResBuf->path, + qDebug("QInfo:%p create tmp file for output result:%s, %" PRId64 "bytes", handle, pResBuf->path, pResBuf->totalBufSize); return TSDB_CODE_SUCCESS; @@ -86,11 +81,11 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf return TSDB_CODE_SUCCESS; } -static bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) { +static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) { return (pResultBuf->allocateId == pResultBuf->numOfPages - 1); } -static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { +static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { assert(pResultBuf != NULL); char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); @@ -99,51 +94,20 @@ static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { } int32_t slot = GET_INT32_VAL(p); - assert(slot >= 0 && slot < pResultBuf->numOfAllocGroupIds); + assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable)); return slot; } static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot - - if (pResultBuf->numOfAllocGroupIds <= num) { - size_t n = pResultBuf->numOfAllocGroupIds << 1u; - - SIDList* p = (SIDList*)realloc(pResultBuf->list, sizeof(SIDList) * n); - assert(p != NULL); - - memset(&p[pResultBuf->numOfAllocGroupIds], 0, sizeof(SIDList) * pResultBuf->numOfAllocGroupIds); - - pResultBuf->list = p; - pResultBuf->numOfAllocGroupIds = n; - } - taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t)); - return num; -} -static int32_t doRegisterId(SIDList* pList, int32_t id) { - if (pList->size >= pList->alloc) { - int32_t s = 0; - if (pList->alloc == 0) { - s = 4; - assert(pList->pData == NULL); - } else { - s = pList->alloc << 1u; - } - - int32_t* c = realloc(pList->pData, s * sizeof(int32_t)); - assert(c); - - memset(&c[pList->alloc], 0, sizeof(int32_t) * pList->alloc); - - pList->pData = c; - pList->alloc = s; - } + SArray* pa = taosArrayInit(1, sizeof(int32_t)); + taosArrayPush(pResultBuf->list, &pa); - pList->pData[pList->size++] = id; - return 0; + assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable)); + return num; } static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { @@ -152,8 +116,8 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int slot = addNewGroupId(pResultBuf, groupId); } - SIDList* pList = &pResultBuf->list[slot]; - doRegisterId(pList, pageId); + SIDList pList = taosArrayGetP(pResultBuf->list, slot); + taosArrayPush(pList, &pageId); } tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { @@ -178,12 +142,11 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { - SIDList list = {0}; int32_t slot = getGroupIndex(pResultBuf, groupId); if (slot < 0) { - return list; + return taosArrayInit(1, sizeof(int32_t)); } else { - return pResultBuf->list[slot]; + return taosArrayGetP(pResultBuf->list, slot); } } @@ -202,22 +165,20 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { tfree(pResultBuf->path); - for (int32_t i = 0; i < pResultBuf->numOfAllocGroupIds; ++i) { - SIDList* pList = &pResultBuf->list[i]; - tfree(pList->pData); + size_t size = taosArrayGetSize(pResultBuf->list); + for (int32_t i = 0; i < size; ++i) { + SArray* pa = taosArrayGetP(pResultBuf->list, i); + taosArrayDestroy(pa); } - tfree(pResultBuf->list); + taosArrayDestroy(pResultBuf->list); taosHashCleanup(pResultBuf->idsTable); tfree(pResultBuf); } -int32_t getLastPageId(SIDList *pList) { - if (pList == NULL || pList->size <= 0) { - return -1; - } - - return pList->pData[pList->size - 1]; +int32_t getLastPageId(SIDList pList) { + size_t size = taosArrayGetSize(pList); + return *(int32_t*) taosArrayGet(pList, size - 1); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index def1fd93830edd55a867711882b89d1a30c8bbc1..5cb45a6d264b45191fdca7794ffc645c088f94f3 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -374,7 +374,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand if (chandle == NULL) return -1; - return (int)send(pFdObj->fd, data, (size_t)len, 0); + return taosWriteMsg(pFdObj->fd, data, len); } static void taosReportBrokenLink(SFdObj *pFdObj) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c43d2636c22ce7950fdd81c445728cdf1b7bc8b7..d63b6525bc3aaed0af547ea02a61c5ef785cfe93 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -90,6 +90,12 @@ typedef struct SBlockOrderSupporter { int32_t* numOfBlocksPerTable; } SBlockOrderSupporter; +typedef struct SIOCostSummary { + int64_t blockLoadTime; + int64_t statisInfoLoadTime; + int64_t checkForNextTime; +} SIOCostSummary; + typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; SQueryFilePos cur; // current position @@ -116,6 +122,8 @@ typedef struct STsdbQueryHandle { SArray* defaultLoadColumn;// default load column SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ + + SIOCostSummary cost; } STsdbQueryHandle; static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); @@ -384,7 +392,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { return NULL; } -bool moveToNextRow(STableCheckInfo* pCheckInfo) { +static bool moveToNextRow(STableCheckInfo* pCheckInfo) { bool hasNext = false; if (pCheckInfo->chosen == 0) { if (pCheckInfo->iter != NULL) { @@ -507,7 +515,7 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK return midSlot; } -static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) { +static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) { SFileGroup* fileGroup = pQueryHandle->pFileGroup; assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); @@ -524,52 +532,61 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo for (int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + pCheckInfo->numOfBlocks = 0; SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; - if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || - compIndex->uid != pCheckInfo->tableId.uid) { // no data block in this file, try next file - pCheckInfo->numOfBlocks = 0; - continue; // no data blocks in the file belongs to pCheckInfo->pTable + + // no data block in this file, try next file + if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { + continue; // no data blocks in the file belongs to pCheckInfo->pTable + } + + if (pCheckInfo->compSize < compIndex->len) { + assert(compIndex->len > 0); + + char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); + assert(t != NULL); + + pCheckInfo->pCompInfo = (SCompInfo*) t; + pCheckInfo->compSize = compIndex->len; + } + + tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); + + tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); + SCompInfo* pCompInfo = pCheckInfo->pCompInfo; + + TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey); } else { - if (pCheckInfo->compSize < compIndex->len) { - assert(compIndex->len > 0); - - char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); - assert(t != NULL); - - pCheckInfo->pCompInfo = (SCompInfo*) t; - pCheckInfo->compSize = compIndex->len; - } - - tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); + assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey); + } - tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); - SCompInfo* pCompInfo = pCheckInfo->pCompInfo; - - TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); - TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); - - // discard the unqualified data block based on the query time window - int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); - int32_t end = start; - - if (s > pCompInfo->blocks[start].keyLast) { - continue; - } + s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); + e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); - // todo speedup the procedure of located end block - while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { - end += 1; - } + // discard the unqualified data block based on the query time window + int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); + int32_t end = start; - pCheckInfo->numOfBlocks = (end - start); - - if (start > 0) { - memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock)); - } + if (s > pCompInfo->blocks[start].keyLast) { + continue; + } + + // todo speedup the procedure of located end block + while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { + end += 1; + } - (*numOfBlocks) += pCheckInfo->numOfBlocks; + pCheckInfo->numOfBlocks = (end - start); + + if (start > 0) { + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock)); } + + (*numOfBlocks) += pCheckInfo->numOfBlocks; } return TSDB_CODE_SUCCESS; @@ -583,22 +600,13 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo .uid = (_checkInfo)->tableId.uid}) - static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { STsdbRepo *pRepo = pQueryHandle->pTsdb; - - // TODO refactor - SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols); - - data->numOfCols = pBlock->numOfCols; - data->uid = pCheckInfo->pTableObj->tableId.uid; - - bool blockLoaded = false; - int64_t st = taosGetTimestampUs(); + bool blockLoaded = false; + int64_t st = taosGetTimestampUs(); if (pCheckInfo->pDataCols == NULL) { STsdbMeta* pMeta = tsdbGetMeta(pRepo); - // TODO pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); } @@ -607,7 +615,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); - if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo) == 0) { + int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; + int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, QH_GET_NUM_OF_COLS(pQueryHandle)); + if (ret == TSDB_CODE_SUCCESS) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; @@ -621,10 +631,10 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); pBlock->numOfRows = pCols->numOfRows; - tfree(data); - int64_t et = taosGetTimestampUs() - st; - tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et); + int64_t elapsedTime = (taosGetTimestampUs() - st); + pQueryHandle->cost.blockLoadTime += elapsedTime; + tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, elapsedTime); return blockLoaded; } @@ -1246,7 +1256,7 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) { tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset); } -#endif +#endif return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1; } @@ -1367,7 +1377,6 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO return TSDB_CODE_SUCCESS; } -// todo opt for only one table case static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) { pQueryHandle->numOfBlocks = 0; SQueryFilePos* cur = &pQueryHandle->cur; @@ -1378,8 +1387,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { - int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; - if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) { + if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { break; } @@ -1487,7 +1495,10 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { // handle data in cache situation bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; - + + int64_t stime = taosGetTimestampUs(); + int64_t elapsedTime = stime; + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables > 0); @@ -1539,7 +1550,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); @@ -1547,7 +1558,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { colInfo.info = pCol->info; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); taosArrayPush(pSecQueryHandle->pColumns, &colInfo); - pSecQueryHandle->statis[i].colId = colInfo.info.colId; } size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); @@ -1569,7 +1579,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); - + pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); + bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); assert(ret); @@ -1607,6 +1618,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { } if (exists) { + elapsedTime = taosGetTimestampUs() - stime; + pQueryHandle->cost.checkForNextTime += elapsedTime; return exists; } @@ -1617,6 +1630,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { // TODO: opt by consider the scan order bool ret = doHasDataInBuffer(pQueryHandle); terrno = TSDB_CODE_SUCCESS; + + elapsedTime = taosGetTimestampUs() - stime; + pQueryHandle->cost.checkForNextTime += elapsedTime; return ret; } @@ -1794,41 +1810,44 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - SQueryFilePos* cur = &pHandle->cur; - if (cur->mixBlock) { + SQueryFilePos* c = &pHandle->cur; + if (c->mixBlock) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } - assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) || - ((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0))); - - STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; - - // file block with subblocks has no statistics data + STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot]; + assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0))); + + // file block with sub-blocks has no statistics data if (pBlockInfo->compBlock->numOfSubBlocks > 1) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } - + + int64_t stime = taosGetTimestampUs(); tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); - // todo opt perf + int16_t* colIds = pHandle->defaultLoadColumn->pData; + size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); + memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis)); for(int32_t i = 0; i < numOfCols; ++i) { - SDataStatis* st = &pHandle->statis[i]; - int32_t colId = st->colId; - - memset(st, 0, sizeof(SDataStatis)); - st->colId = colId; + pHandle->statis[i].colId = colIds[i]; } tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); - - *pBlockStatis = pHandle->statis; - + + // always load the first primary timestamp column data + SDataStatis* pPrimaryColStatis = &pHandle->statis[0]; + assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX); + + pPrimaryColStatis->numOfNull = 0; + pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst; + pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast; + //update the number of NULL data rows - for(int32_t i = 0; i < numOfCols; ++i) { + for(int32_t i = 1; i < numOfCols; ++i) { if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } @@ -1840,7 +1859,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta pHandle->statis[i].max = pBlockInfo->compBlock->keyLast; } } - + + int64_t elapsed = taosGetTimestampUs() - stime; + pHandle->cost.statisInfoLoadTime += elapsed; + + *pBlockStatis = pHandle->statis; return TSDB_CODE_SUCCESS; } @@ -1893,8 +1916,6 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { } } -SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; } - static int32_t getAllTableList(STable* pSuperTable, SArray* list) { SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex); while (tSkipListIterNext(iter)) { @@ -1923,6 +1944,7 @@ static void destroyHelper(void* param) { free(param); } +#define TAG_INVALID_COLUMN_INDEX -2 static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) { // filter on table name(TBNAME) if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) { @@ -1954,9 +1976,8 @@ void filterPrepare(void* expr, void* param) { tVariant* pCond = pExpr->_node.pRight->pVal; SSchema* pSchema = pExpr->_node.pLeft->pSchema; - // todo : if current super table does not change schema yet, this function may fail to get correct schema, test case int32_t index = getTagColumnIndex(pTSSchema, pSchema); - assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX)); + assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX) || index == TAG_INVALID_COLUMN_INDEX); pInfo->sch = *pSchema; pInfo->colIndex = index; @@ -2362,6 +2383,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem); tsdbDestroyHelper(&pQueryHandle->rhelper); + + SIOCostSummary* pCost = &pQueryHandle->cost; + tsdbDebug(":io-cost summary: statis-info:%"PRId64"us, datablock:%" PRId64"us, check data:%"PRId64"us, %p", + pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo); + tfree(pQueryHandle); } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 072e9939de3c672a80cebe93b2075a347991411a..92d4b2caacb061bf144920900548875b06904a21 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -225,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime * @param handle Cache object handle */ -static void* taosCacheTimedRefresh(void *pCacheObj); +static void* taosCacheTimedRefresh(void *handle); SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) { if (refreshTimeInSeconds <= 0) { @@ -455,51 +455,11 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { __cache_unlock(pCacheObj); } else { - uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1); - - __cache_wr_lock(pCacheObj); - // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. int32_t ref = T_REF_DEC(pNode); - - if (inTrashCan && (ref == 0)) { - // Remove it if the ref count is 0. - // The ref count does not need to load and check again after lock acquired, since ref count can not be increased when - // the node is in trashcan. - assert(pNode->pTNodeHeader->pData == pNode); - taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); - } - - __cache_unlock(pCacheObj); + uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, pNode->key, pNode->data, ref, + inTrashCan); } - -// else { -// if (_remove) { // not in trash can, but need to remove it -// __cache_wr_lock(pCacheObj); -// -// /* -// * If not referenced by other users. Otherwise move this node to trashcan wait for all users -// * releasing this resources. -// * -// * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread -// * that tries to do the same thing. -// */ -// if (ref == 0) { -// if (T_REF_VAL_GET(pNode) == 0) { -// taosCacheReleaseNode(pCacheObj, pNode); -// } else { -// taosCacheMoveToTrash(pCacheObj, pNode); -// } -// } else if (ref > 0) { -// if (!pNode->inTrashCan) { -// assert(pNode->pTNodeHeader == NULL); -// taosCacheMoveToTrash(pCacheObj, pNode); -// } -// } -// -// __cache_unlock(pCacheObj); -// } -// } } void taosCacheEmpty(SCacheObj *pCacheObj) { diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 26a6a39815b75712465fbdc045e96cd8cefa933a..c6981d29022dca56198d6655379472c905984285 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -26,7 +26,7 @@ sleep 2000 run general/parser/fill.sim sleep 2000 run general/parser/fill_stb.sim -sleep 2000 +sleep 2000 #run general/parser/fill_us.sim # sleep 2000 run general/parser/first_last.sim @@ -91,13 +91,11 @@ run general/parser/select_with_tags.sim sleep 2000 run general/parser/groupby.sim sleep 2000 +run general/parser/tags_filter.sim +sleep 2000 run general/parser/union.sim sleep 2000 run general/parser/sliding.sim -sleep 2000 -run general/parser/fill_us.sim -sleep 2000 -run general/parser/tags_filter.sim #sleep 2000 #run general/parser/repeatStream.sim diff --git a/tests/script/general/parser/union.sim b/tests/script/general/parser/union.sim index 14b6c97b7c8bb8a3497cb68ca426d6560320d8dc..358bcb8a40d7400e31d97091b182a69378d8dc26 100644 --- a/tests/script/general/parser/union.sim +++ b/tests/script/general/parser/union.sim @@ -1,10 +1,10 @@ -system sh/stop_dnodes.sh - -system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c walLevel -v 0 -system sh/cfg.sh -n dnode1 -c debugFlag -v 135 -system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135 -system sh/exec.sh -n dnode1 -s start +#system sh/stop_dnodes.sh +# +#system sh/deploy.sh -n dnode1 -i 1 +#system sh/cfg.sh -n dnode1 -c walLevel -v 0 +#system sh/cfg.sh -n dnode1 -c debugFlag -v 135 +#system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135 +#system sh/exec.sh -n dnode1 -s start sleep 1000 sql connect @@ -24,77 +24,77 @@ $mt = $mtPrefix . $i $j = 1 $mt1 = $mtPrefix . $j - -sql drop database if exits $db -x step1 -step1: -sql create database if not exists $db maxtables 4 +# +#sql drop database if exits $db -x step1 +#step1: +#sql create database if not exists $db maxtables 4 sql use $db -sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) - -$i = 0 -$t = 1578203484000 - -while $i < $tbNum - $tb = $tbPrefix . $i - sql create table $tb using $mt tags( $i ) - - $x = 0 - while $x < $rowNum - $ms = $x * 1000 - $ms = $ms * 60 - - $c = $x / 100 - $c = $c * 100 - $c = $x - $c - $binary = 'binary . $c - $binary = $binary . ' - $nchar = 'nchar . $c - $nchar = $nchar . ' - - $t1 = $t + $ms - sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) - $x = $x + 1 - endw - - $i = $i + 1 -endw - -sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) - -$j = 0 -$t = 1578203484000 -$rowNum = 1000 -$tbNum = 5 -$i = 0 - -while $i < $tbNum - $tb1 = $tbPrefix1 . $j - sql create table $tb1 using $mt1 tags( $i ) - - $x = 0 - while $x < $rowNum - $ms = $x * 1000 - $ms = $ms * 60 - - $c = $x / 100 - $c = $c * 100 - $c = $x - $c - $binary = 'binary . $c - $binary = $binary . ' - $nchar = 'nchar . $c - $nchar = $nchar . ' - - $t1 = $t + $ms - sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) - $x = $x + 1 - endw - - $i = $i + 1 - $j = $j + 1 -endw - -print sleep 1sec. -sleep 1000 +#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) +# +#$i = 0 +#$t = 1578203484000 +# +#while $i < $tbNum +# $tb = $tbPrefix . $i +# sql create table $tb using $mt tags( $i ) +# +# $x = 0 +# while $x < $rowNum +# $ms = $x * 1000 +# $ms = $ms * 60 +# +# $c = $x / 100 +# $c = $c * 100 +# $c = $x - $c +# $binary = 'binary . $c +# $binary = $binary . ' +# $nchar = 'nchar . $c +# $nchar = $nchar . ' +# +# $t1 = $t + $ms +# sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) +# $x = $x + 1 +# endw +# +# $i = $i + 1 +#endw +# +#sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) +# +#$j = 0 +#$t = 1578203484000 +#$rowNum = 1000 +#$tbNum = 5 +#$i = 0 +# +#while $i < $tbNum +# $tb1 = $tbPrefix1 . $j +# sql create table $tb1 using $mt1 tags( $i ) +# +# $x = 0 +# while $x < $rowNum +# $ms = $x * 1000 +# $ms = $ms * 60 +# +# $c = $x / 100 +# $c = $c * 100 +# $c = $x - $c +# $binary = 'binary . $c +# $binary = $binary . ' +# $nchar = 'nchar . $c +# $nchar = $nchar . ' +# +# $t1 = $t + $ms +# sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) +# $x = $x + 1 +# endw +# +# $i = $i + 1 +# $j = $j + 1 +#endw +# +#print sleep 1sec. +#sleep 1000 $i = 1 $tb = $tbPrefix . $i @@ -222,7 +222,7 @@ endi print ===========================================tags union # two super table tag union, limit is not active during retrieve tags query -sql select t1 from union_mt0 union all select t1 from union_mt0 limit 1 +sql select t1 from union_mt0 union all select t1 from union_mt0 if $rows != 20 then return -1 endi @@ -235,6 +235,10 @@ if $data90 != 9 then return -1 endi +#sql select t1 from union_mt0 union all select t1 from union_mt0 limit 1 +#if $row != 11 then +# return -1 +#endi #========================================== two super table join subclause print ================two super table join subclause sql select avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10 union all select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5;