diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 715d76e072cae10cc266ec9182b9fda806962e83..e4857dd18d90c4326d5ab7bb6333707445a09d3e 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -175,7 +175,7 @@ SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIn SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, int16_t size); -int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); +size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 40806f3fe8bcf4f819945302a47005423797eec8..a6310ea0073de85086bcf1f8ff19e5f16fe2d203 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -1848,13 +1848,14 @@ static void last_row_function(SQLFunctionCtx *pCtx) { pResInfo->hasResult = DATA_SET_FLAG; SLastrowInfo *pInfo = (SLastrowInfo *)pResInfo->interResultBuf; - pInfo->ts = pCtx->param[0].i64Key; + pInfo->ts = pCtx->ptsList[0]; + pInfo->hasResult = DATA_SET_FLAG; // set the result to final result buffer if (pResInfo->superTableQ) { SLastrowInfo *pInfo1 = (SLastrowInfo *)(pCtx->aOutputBuf + pCtx->inputBytes); - pInfo1->ts = pCtx->param[0].i64Key; + pInfo1->ts = pCtx->ptsList[0]; pInfo1->hasResult = DATA_SET_FLAG; DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts); @@ -1904,13 +1905,12 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6 memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen); } else { // the tags are dumped from the ctx tag fields for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { - SQLFunctionCtx* __ctx = pTagInfo->pTagCtxList[i]; - if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { - __ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey}; + SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; + if (ctx->functionId == TSDB_FUNC_TS_DUMMY) { + ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey}; } - //todo? error ?? - tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType, false); + tVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true); size += pTagInfo->pTagCtxList[i]->outputBytes; } } @@ -2227,7 +2227,6 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { char *tmp = (char *)pTopBotInfo + sizeof(STopBotInfo); pTopBotInfo->res = (tValuePair**) tmp; - tmp += POINTER_BYTES * pCtx->param[0].i64Key; size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; @@ -3823,115 +3822,11 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { } /** - * param[1]: default value/previous value of specified timestamp - * param[2]: next value of specified timestamp - * param[3]: denotes if the result is a precious result or interpolation results - * - * param[1]: denote the specified timestamp to generated the interp result - * param[2]: fill policy * * @param pCtx */ static void interp_function(SQLFunctionCtx *pCtx) { // at this point, the value is existed, return directly -#if 0 - if (pCtx->param[3].i64Key == 1) { - char *pData = GET_INPUT_CHAR(pCtx); - assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType); - } else { - /* - * use interpolation to generate the result. - * Note: the result of primary timestamp column uses the timestamp specified by user in the query sql - */ - assert(pCtx->param[3].i64Key == 2); - - SInterpInfo interpInfo = *(SInterpInfo *)pCtx->aOutputBuf; - SInterpInfoDetail *pInfoDetail = interpInfo.pInterpDetail; - - /* set no output result */ - if (pInfoDetail->type == TSDB_FILL_NONE) { - pCtx->param[3].i64Key = 0; - } else if (pInfoDetail->primaryCol == 1) { - *(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts; - } else { - if (pInfoDetail->type == TSDB_FILL_NULL) { - if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pCtx->aOutputBuf, pCtx->outputType); - } else { - setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); - } - } else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) { - tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType); - } else if (pInfoDetail->type == TSDB_FILL_PREV) { - char *data = pCtx->param[1].pz; - char *pVal = data + TSDB_KEYSIZE; - - if (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) { - float v = GET_DOUBLE_VAL(pVal); - assignVal(pCtx->aOutputBuf, (const char*) &v, pCtx->outputBytes, pCtx->outputType); - } else { - assignVal(pCtx->aOutputBuf, pVal, pCtx->outputBytes, pCtx->outputType); - } - - } else if (pInfoDetail->type == TSDB_FILL_LINEAR) { - char *data1 = pCtx->param[1].pz; - char *data2 = pCtx->param[2].pz; - - char *pVal1 = data1 + TSDB_KEYSIZE; - char *pVal2 = data2 + TSDB_KEYSIZE; - - SPoint point1 = {.key = *(TSKEY *)data1, .val = &pCtx->param[1].i64Key}; - SPoint point2 = {.key = *(TSKEY *)data2, .val = &pCtx->param[2].i64Key}; - - SPoint point = {.key = pInfoDetail->ts, .val = pCtx->aOutputBuf}; - - int32_t srcType = pCtx->inputType; - if ((srcType >= TSDB_DATA_TYPE_TINYINT && srcType <= TSDB_DATA_TYPE_BIGINT) || - srcType == TSDB_DATA_TYPE_TIMESTAMP || srcType == TSDB_DATA_TYPE_DOUBLE) { - point1.val = pVal1; - - point2.val = pVal2; - - if (isNull(pVal1, srcType) || isNull(pVal2, srcType)) { - setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); - } else { - taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point); - } - } else if (srcType == TSDB_DATA_TYPE_FLOAT) { - float v1 = GET_DOUBLE_VAL(pVal1); - float v2 = GET_DOUBLE_VAL(pVal2); - - point1.val = &v1; - point2.val = &v2; - - if (isNull(pVal1, srcType) || isNull(pVal2, srcType)) { - setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); - } else { - taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point); - } - - } else { - if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pCtx->aOutputBuf, pCtx->inputBytes); - } else { - setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); - } - } - } - } - - free(interpInfo.pInterpDetail); - } - - pCtx->size = pCtx->param[3].i64Key; - - tVariantDestroy(&pCtx->param[1]); - tVariantDestroy(&pCtx->param[2]); - - // data in the check operation are all null, not output - SET_VAL(pCtx, pCtx->size, 1); -#endif - SResultInfo *pResInfo = GET_RES_INFO(pCtx); SInterpInfoDetail* pInfo = pResInfo->interResultBuf; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2759ecb836b7c98c1adda5ebf2f7c777e12091ca..c24c97e286f34bc3e2dbb1838ac8eaa7116bfe08 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1403,7 +1403,6 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE}; strcpy(colSchema.name, TSQL_TBNAME_L); - pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY; tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true); } else { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); @@ -4166,6 +4165,10 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { int32_t relTagIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + // it is a tag column + if (pQueryInfo->groupbyExpr.columnInfo == NULL) { + return invalidSqlErrMsg(pQueryInfo->msg, msg2); + } SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); if (relTagIndex == pColIndex->colIndex) { orderByTags = true; @@ -4678,7 +4681,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* const char* msg0 = "soffset/offset can not be less than 0"; const char* msg1 = "slimit/soffset only available for STable query"; - const char* msg2 = "function not supported on table"; + const char* msg2 = "functions mixed up in table query"; const char* msg3 = "slimit/soffset can not apply to projection query"; // handle the limit offset value, validate the limit @@ -4761,14 +4764,22 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* } size_t size = taosArrayGetSize(pQueryInfo->exprList); - + + bool hasTags = false; + bool hasOtherFunc = false; // filter the query functions operating on "tbname" column that are not supported by normal columns. for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { - return invalidSqlErrMsg(pQueryInfo->msg, msg2); + if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { + hasTags = true; + } else { + hasOtherFunc = true; } } + + if (hasTags && hasOtherFunc) { + return invalidSqlErrMsg(pQueryInfo->msg, msg2); + } } return TSDB_CODE_SUCCESS; @@ -5831,7 +5842,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; } } else { // set the time rang - pQueryInfo->window.skey = 0; + pQueryInfo->window.skey = TSKEY_INITIAL_VAL; pQueryInfo->window.ekey = INT64_MAX; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 7617621e5f6dce34e48f142adb5bba3a7a277d67..219819a2b0a28201fe9f28d979a57131f34e667a 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -1049,7 +1049,14 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, int32_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) { tVariantDestroy(&pCtx->tag); - tVariantCreateFromBinary(&pCtx->tag, pCtx->aInputElemBuf, pCtx->inputBytes, pCtx->inputType); + char* input = pCtx->aInputElemBuf; + + if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) { + assert(varDataLen(input) <= pCtx->inputBytes); + tVariantCreateFromBinary(&pCtx->tag, varDataVal(input), varDataLen(input), pCtx->inputType); + } else { + tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType); + } } pCtx->currentStage = SECONDARY_STAGE_MERGE; @@ -1309,7 +1316,7 @@ static bool isAllSourcesCompleted(SLocalReducer *pLocalReducer) { return (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted); } -static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { +static bool doBuildFilledResultForGroup(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -1347,8 +1354,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SLocalReducer * pLocalReducer = pRes->pLocalReducer; - SFillInfo *pFillInfo = pLocalReducer->pFillInfo; + SLocalReducer *pLocalReducer = pRes->pLocalReducer; + SFillInfo *pFillInfo = pLocalReducer->pFillInfo; bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; @@ -1445,7 +1452,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } - if (doInterpolationForCurrentGroup(pSql)) { + if (doBuildFilledResultForGroup(pSql)) { pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result. return TSDB_CODE_SUCCESS; } @@ -1464,8 +1471,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { #ifdef _DEBUG_VIEW printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); #endif - assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && - tmpBuffer->num == 0); + assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0); // chosen from loser tree SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index]; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 44cfa1de96f6c2254a12caacac2214d682b3f3c0..9b449cbd5c05edc80deb32e1b4c3dff54ad3c2c4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1032,7 +1032,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi return pExpr; } -int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { +size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { return taosArrayGetSize(pQueryInfo->exprList); } @@ -1351,7 +1351,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) { return false; } - if (colId == -1 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + if (colId == TSDB_TBNAME_COLUMN_INDEX) { return true; } @@ -2122,7 +2122,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column int32_t type = pInfo->pSqlExpr->resType; int32_t bytes = pInfo->pSqlExpr->resBytes; - char* pData = ((char*) pRes->data) + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; + char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { int32_t realLen = varDataLen(pData); @@ -2135,7 +2135,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column } if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor - *(char*) (pData + realLen + VARSTR_HEADER_SIZE) = 0; + *(pData + realLen + VARSTR_HEADER_SIZE) = 0; } pRes->length[columnIndex] = realLen; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 17b975c193d20b9c3ce1b129a507588c20f0a634..0ea96949bf917714d05182de9d4ad8ddc119a709 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -620,13 +620,6 @@ typedef struct { SCMVgroupInfo vgroups[]; } SVgroupsInfo; -//typedef struct { -// int32_t numOfTables; -// int32_t join; -// int32_t joinCondLen; // for join condition -// int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; -//} SSuperTableMetaMsg; - typedef struct STableMetaMsg { int32_t contLen; char tableId[TSDB_TABLE_ID_LEN + 1]; // table id diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 66275236cdb67168bf6d98281ac28fb961210d7d..5149d9be61bebf7585f4e57c76d066ab8ed86849 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -198,6 +198,8 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable */ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); +SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle); + TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList); /** diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qextbuffer.h index 0d608f1f1bbc3515db461f5ee5c98bfe324a073f..9721687110f6eaa58635b84e7cac3ac1edc54130 100644 --- a/src/query/inc/qextbuffer.h +++ b/src/query/inc/qextbuffer.h @@ -28,8 +28,7 @@ extern "C" { #include "tdataformat.h" #include "talgo.h" -#define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo -#define MIN_BUFFER_SIZE (1 << 19) +#define DEFAULT_PAGE_SIZE (1024L*56) // 16k larger than the SHistoInfo #define MAX_TMPFILE_PATH_LENGTH PATH_MAX #define INITIAL_ALLOCATION_BUFFER_SIZE 64 diff --git a/src/query/inc/qresultBuf.h b/src/query/inc/qresultBuf.h index 2e813dbd98417eda0411d469653c8a1573b4be38..1375594210d070ca23c6ac2ca58ab52dd6b32c3c 100644 --- a/src/query/inc/qresultBuf.h +++ b/src/query/inc/qresultBuf.h @@ -44,6 +44,8 @@ typedef struct SDiskbasedResultBuf { SIDList* list; // for each id, there is a page id list } SDiskbasedResultBuf; +#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5) + /** * create disk-based result buffer * @param pResultBuf diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 62b551f2f54830637f0056efd1d6b3ceabcfbb49..646ce4dfe69e8e74e09edf518728215dadddb71a 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -161,26 +161,24 @@ typedef struct SExtTagsInfo { // sql function runtime context typedef struct SQLFunctionCtx { - int32_t startOffset; - int32_t size; // number of rows - uint32_t order; // asc|desc - uint32_t scanFlag; // TODO merge with currentStage - - int16_t inputType; - int16_t inputBytes; - - int16_t outputType; - int16_t outputBytes; // size of results, determined by function and input column data type - bool hasNull; // null value exist in current block - int16_t functionId; // function id - void * aInputElemBuf; - char * aOutputBuf; // final result output buffer, point to sdata->data - uint8_t currentStage; // record current running step, default: 0 - int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block - int32_t numOfParams; - tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */ - int64_t *ptsList; // corresponding timestamp array list - void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + int32_t startOffset; + int32_t size; // number of rows + uint32_t order; // asc|desc + int16_t inputType; + int16_t inputBytes; + + int16_t outputType; + int16_t outputBytes; // size of results, determined by function and input column data type + bool hasNull; // null value exist in current block + int16_t functionId; // function id + void * aInputElemBuf; + char * aOutputBuf; // final result output buffer, point to sdata->data + uint8_t currentStage; // record current running step, default: 0 + int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block + int32_t numOfParams; + tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */ + int64_t * ptsList; // corresponding timestamp array list + void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SQLPreAggVal preAggVals; tVariant tag; SResultInfo *resultInfo; diff --git a/src/query/inc/tvariant.h b/src/query/inc/tvariant.h index 5911db73a7e04b46d0f71aebb31def4ac2221f50..4fd6ea554191638f0220f8c46a91710474591a7a 100644 --- a/src/query/inc/tvariant.h +++ b/src/query/inc/tvariant.h @@ -48,7 +48,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc); int32_t tVariantToString(tVariant *pVar, char *dst); -int32_t tVariantDump(tVariant *pVariant, char *payload, char type, bool includeLengthPrefix); +int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix); int32_t tVariantTypeSetType(tVariant *pVariant, char type); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2196cc29f66dcae90e79c2f67d41c337826c346e..cc7eb6e8e69a5223c352bee7f3cf1ed5766a69c1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include +#include "qfill.h" #include "os.h" #include "hash.h" @@ -30,8 +30,6 @@ #include "tscompression.h" #include "ttime.h" -#define DEFAULT_INTERN_BUF_SIZE 16384L - /** * check if the primary column is load by default, otherwise, the program will * forced to load primary column explicitly. @@ -821,7 +819,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas * * @param pRuntimeEnv * @param forwardStep - * @param primaryKeyCol + * @param tsCols * @param pFields * @param isDiskFileBlock * @return the incremental number of output value, so it maybe 0 for fixed number of query, @@ -831,25 +829,25 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - SQuery * pQuery = pRuntimeEnv->pQuery; - - TSKEY *primaryKeyCol = NULL; + + SQuery *pQuery = pRuntimeEnv->pQuery; + TSKEY *tsCols = NULL; if (pDataBlock != NULL) { SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, 0); - primaryKeyCol = (TSKEY *)(pColInfo->pData); + tsCols = (TSKEY *)(pColInfo->pData); } SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (isIntervalQuery(pQuery)) { int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); - TSKEY ts = primaryKeyCol[offset]; + TSKEY ts = tsCols[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) { @@ -858,16 +856,16 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ekey = reviseWindowEkey(pQuery, &win); int32_t forwardStep = - getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); + getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, primaryKeyCol, pDataBlockInfo->rows); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows); int32_t index = pWindowResInfo->curIndex; STimeWindow nextWin = win; while (1) { - int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, primaryKeyCol, searchFn); + int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn); if (startPos < 0) { break; } @@ -878,10 +876,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } ekey = reviseWindowEkey(pQuery, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); + forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, primaryKeyCol, pDataBlockInfo->rows); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } pWindowResInfo->curIndex = index; @@ -1043,7 +1041,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; - TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; + TSKEY *tsCols = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); @@ -1057,7 +1055,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); } // set the input column data @@ -1101,7 +1099,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // interval window query if (isIntervalQuery(pQuery)) { // decide the time window according to the primary timestamp - int64_t ts = primaryKeyCol[offset]; + int64_t ts = tsCols[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win); @@ -1165,7 +1163,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } - item->lastKey = primaryKeyCol[offset] + step; + item->lastKey = tsCols[offset] + step; // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -1310,14 +1308,15 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY // set the output buffer for the selectivity + tag query static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { if (isSelectivityWithTagsQuery(pQuery)) { - int32_t num = 0; - SQLFunctionCtx *p = NULL; - + int32_t num = 0; int16_t tagLen = 0; - + + SQLFunctionCtx *p = NULL; SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; + if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pCtx[i].outputBytes; pTagCtx[num++] = &pCtx[i]; @@ -1340,6 +1339,8 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE); + setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interBytes, isStableQuery); } } @@ -1371,11 +1372,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->inputBytes = pQuery->tagColList[index].bytes; pCtx->inputType = pQuery->tagColList[index].type; } + } else { pCtx->inputBytes = pQuery->colList[index].bytes; pCtx->inputType = pQuery->colList[index].type; } - + + assert(isValidDataType(pCtx->inputType, pCtx->inputBytes)); pCtx->ptsOutputBuf = NULL; pCtx->outputBytes = pQuery->pSelectExpr[i].bytes; @@ -1524,7 +1527,7 @@ static bool isFixedOutputQuery(SQuery *pQuery) { static bool isPointInterpoQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionID = pQuery->pSelectExpr[i].base.functionId; - if (functionID == TSDB_FUNC_INTERP/* || functionID == TSDB_FUNC_LAST_ROW*/) { + if (functionID == TSDB_FUNC_INTERP) { return true; } } @@ -1855,7 +1858,7 @@ static int32_t getRowParamForMultiRowsOutput(SQuery *pQuery, bool isSTableQuery) static int32_t getNumOfRowsInResultPage(SQuery *pQuery, bool isSTableQuery) { int32_t rowSize = pQuery->rowSize * getRowParamForMultiRowsOutput(pQuery, isSTableQuery); - return (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize; + return (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize; } char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) { @@ -2276,14 +2279,18 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes pCtx[i].hasNull = true; pCtx[i].nStartQueryTimestamp = timestamp; pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); - // pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + - // ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + - // pCtx[i].outputBytes * inputIdx; // in case of tag column, the tag information should be extracted from input buffer if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { tVariantDestroy(&pCtx[i].tag); - tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType); + + int32_t type = pCtx[i].outputType; + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + tVariantCreateFromBinary(&pCtx[i].tag, varDataVal(pCtx[i].aInputElemBuf), varDataLen(pCtx[i].aInputElemBuf), type); + } else { + tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType); + } + } } @@ -2561,7 +2568,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { size_t size = taosArrayGetSize(pGroup); - tFilePage **buffer = (tFilePage **)pQuery->sdata; + tFilePage **buffer = pQuery->sdata; int32_t * posList = calloc(size, sizeof(int32_t)); STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); @@ -2691,7 +2698,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; + int32_t capacity = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / pQuery->rowSize; // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. int32_t pageId = -1; @@ -3275,7 +3282,6 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; @@ -4233,7 +4239,15 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } initCtxOutputBuf(pRuntimeEnv); - setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb); + + SArray* s = tsdbGetQueriedTableIdList(pRuntimeEnv->pQueryHandle); + assert(taosArrayGetSize(s) >= 1); + + setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb); + + if (isFirstLastRowQuery(pQuery)) { + assert(taosArrayGetSize(s) == 1); + } // here we simply set the first table as current table pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; @@ -4802,11 +4816,12 @@ static void stableQueryImpl(SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); if (isIntervalQuery(pQuery) || - (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) { + (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && + !isFirstLastRowQuery(pQuery))) { multiTableQueryProcess(pQInfo); } else { assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || - isGroupbyNormalCol(pQuery->pGroupbyExpr)); + isFirstLastRowQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); sequentialTableProcess(pQInfo); } @@ -5130,9 +5145,9 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable return TSDB_CODE_SUCCESS; } -static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pSqlFuncExpr, - SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) { - *pSqlFuncExpr = NULL; +static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, + SColumnInfo* pTagCols) { + *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; SExprInfo *pExprs = (SExprInfo *)calloc(1, sizeof(SExprInfo) * pQueryMsg->numOfOutput); @@ -5186,8 +5201,6 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo assert(isValidDataType(pExprs[i].type, pExprs[i].bytes)); } - // get the correct result size for top/bottom query, according to the number of tags columns in selection clause - // TODO refactor for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; @@ -5207,7 +5220,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo } tfree(pExprMsg); - *pSqlFuncExpr = pExprs; + *pExprInfo = pExprs; return TSDB_CODE_SUCCESS; } @@ -5326,25 +5339,32 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].base; - if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { + if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM) { continue; } + // todo opt performance SColIndex *pColIndex = &pSqlExprMsg->colInfo; if (!TSDB_COL_IS_TAG(pColIndex->flag)) { - for (int32_t f = 0; f < pQuery->numOfCols; ++f) { + int32_t f = 0; + for (f = 0; f < pQuery->numOfCols; ++f) { if (pColIndex->colId == pQuery->colList[f].colId) { pColIndex->colIndex = f; break; } } + + assert (f < pQuery->numOfCols); } else { - for (int32_t f = 0; f < pQuery->numOfTags; ++f) { + int32_t f = 0; + for (f = 0; f < pQuery->numOfTags; ++f) { if (pColIndex->colId == pQuery->tagColList[f].colId) { pColIndex->colIndex = f; break; } } + + assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX); } } } @@ -5382,7 +5402,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; - pQuery->fillType = pQueryMsg->fillType; + pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; // todo do not allocate ?? @@ -5464,7 +5484,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, STableId id = *(STableId*) taosArrayGet(pa, j); SGroupItem item = { .id = id }; // NOTE: compare STableIdInfo with STableId - // not a problem at present because we only use their 1st int64_t field STableIdInfo* pTableId = taosArraySearch( pTableIdList, &id, compareTableIdInfo); if (pTableId != NULL ) { window.skey = pTableId->key; @@ -5482,7 +5501,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); pQuery->pos = -1; - pQuery->window = pQueryMsg->window; if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { @@ -5796,14 +5814,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi // todo handle the error /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, numOfGroupByCols); -// if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query -// code = TSDB_CODE_SUCCESS; -// qTrace("qmsg:%p no results to produce by tag filters, return directly", pQueryMsg); - -// goto _over; -// } } else { -// groupInfo.numOfTables = taosArrayGetSize(pTableIdList); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId)); @@ -6014,7 +6025,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data); assert(bytes <= pExprInfo[j].bytes && type == pExprInfo[j].type); - char* dst = pQuery->sdata[j]->data + i * bytes; + char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { memcpy(dst, data, varDataTLen(data)); } else { diff --git a/src/query/src/qresultBuf.c b/src/query/src/qresultBuf.c index bdf700c83f3fa7d69ab83dcb273939b001524cd6..8910d84830679a282c8a1599d42bbbac5edd8d89 100644 --- a/src/query/src/qresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -5,14 +5,12 @@ #include "tsqlfunction.h" #include "queryLog.h" -#define DEFAULT_INTERN_BUF_SIZE 16384L - int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) { SDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SDiskbasedResultBuf)); - pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize; + pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize; pResBuf->numOfPages = size; - pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE; + pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResBuf->incStep = 4; // init id hash table @@ -33,7 +31,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si return TSDB_CODE_CLI_NO_DISKSPACE; } - int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE); + int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); if (ret != TSDB_CODE_SUCCESS) { qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); return TSDB_CODE_CLI_NO_DISKSPACE; @@ -55,7 +53,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id) { assert(id < pResultBuf->numOfPages && id >= 0); - return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id); + return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE * id); } int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); } @@ -63,7 +61,7 @@ int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosH int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOfPages) { - assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE == pResultBuf->totalBufSize); + assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE == pResultBuf->totalBufSize); int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize); pResultBuf->numOfPages += numOfPages; @@ -72,14 +70,14 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may * be insufficient */ - ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE); + ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); if (ret != 0) { // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, // strerror(errno)); return -TSDB_CODE_SERV_NO_DISKSPACE; } - pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE; + pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0); if (pResultBuf->pBuf == MAP_FAILED) { @@ -174,7 +172,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 tFilePage* page = getResultBufferPageById(pResultBuf, *pageId); // clear memory for the new page - memset(page, 0, DEFAULT_INTERN_BUF_SIZE); + memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE); return page; } diff --git a/src/query/src/tvariant.c b/src/query/src/tvariant.c index 6f058f79ef534478dab23561d4b621a0a23d667e..2cf60d3e91d30c55c3c61b546cc1f792ea96afd7 100644 --- a/src/query/src/tvariant.c +++ b/src/query/src/tvariant.c @@ -596,7 +596,7 @@ static int32_t convertToBool(tVariant *pVariant, int64_t *pDest) { * * todo handle the return value */ -int32_t tVariantDump(tVariant *pVariant, char *payload, char type, bool includeLengthPrefix) { +int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix) { if (pVariant == NULL || (pVariant->nType != 0 && !isValidDataType(pVariant->nType, pVariant->nLen))) { return -1; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b280a694174bbabf0477b5399f782067b2de6c8c..dd5e4a2ee416d32a06d3f239fdfc8e33f4849d4a 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -211,6 +211,22 @@ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable return pQueryHandle; } +SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) { + assert(pHandle != NULL); + + STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle; + + size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + SArray* res = taosArrayInit(size, sizeof(STableId)); + + for(int32_t i = 0; i < size; ++i) { + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + taosArrayPush(res, &pCheckInfo->tableId); + } + + return res; +} + TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) { STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList); @@ -1461,6 +1477,9 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { for(int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + if (pCheckInfo->pTableObj->tableId.uid == 12094628167747) { + printf("abc\n"); + } if (pCheckInfo->pTableObj->lastKey > key) { key = pCheckInfo->pTableObj->lastKey; index = i; diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index 171c03b357371a062cc8544b6eeebafc601d39eb..4d86b50f38e709bb0874883b42e3a714db2e37d0 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -1,10 +1,10 @@ -#system sh/stop_dnodes.sh -# -#system sh/deploy.sh -n dnode1 -i 1 -#system sh/cfg.sh -n dnode1 -c walLevel -v 0 -#system sh/exec.sh -n dnode1 -s start -# -#sleep 3000 +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 0 +system sh/exec.sh -n dnode1 -s start + +sleep 3000 sql connect $dbPrefix = wh_db @@ -13,39 +13,39 @@ $mtPrefix = wh_mt $tbNum = 10 $rowNum = 10000 $totalNum = $tbNum * $rowNum -# -#print =============== where.sim + +print =============== where.sim $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -# -#sql drop database if exits $db -x step1 -#step1: + +sql drop database if exits $db -x step1 +step1: sql create database if not exists $db maxTables 4 sql use $db -#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) -# -#$i = 0 -#while $i < $tbNum -# $tb = $tbPrefix . $i -# sql create table $tb using $mt tags( $i ) -# -# $x = 0 -# while $x < $rowNum -# $ms = $x . m -# $c = $x / 100 -# $c = $c * 100 -# $c = $x - $c -# $binary = 'binary . $c -# $binary = $binary . ' -# $nchar = 'nchar . $c -# $nchar = $nchar . ' -# sql insert into $tb values (now + $ms , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) -# $x = $x + 1 -# endw -# -# $i = $i + 1 -#endw +sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $ms = $x . m + $c = $x / 100 + $c = $c * 100 + $c = $x - $c + $binary = 'binary . $c + $binary = $binary . ' + $nchar = 'nchar . $c + $nchar = $nchar . ' + sql insert into $tb values (now + $ms , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) + $x = $x + 1 + endw + + $i = $i + 1 +endw sleep 100