diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index 5370d0ec5259c3e47e065d4fe8b37884bdf6a050..f6c75167547442c59a2b74623a6f480734768327 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd void tscDestroyLocalReducer(SSqlObj *pSql); -int32_t tscDoLocalreduce(SSqlObj *pSql); +int32_t tscDoLocalMerge(SSqlObj *pSql); #ifdef __cplusplus } diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 22322ec219ea3d864fd4242f871dd332578b8a70..9067be63255d02d623c8917b5e4deedf665cfd58 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2981,11 +2981,28 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { */ static void tag_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, 1, 1); - tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType); + + char* output = pCtx->aOutputBuf; + + // todo refactor to dump length presented string(var string) + if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) { + *(int16_t*) output = pCtx->tag.nLen; + output += VARSTR_HEADER_SIZE; + } + + tVariantDump(&pCtx->tag, output, pCtx->tag.nType); } static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) { SET_VAL(pCtx, 1, 1); + + char* output = pCtx->aOutputBuf; + + // todo refactor to dump length presented string(var string) + if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) { + *(int16_t*) output = pCtx->tag.nLen; + output += VARSTR_HEADER_SIZE; + } tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType); } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index d9dc9a9f41d989515e1241550cbefcf3ce8793cb..1abefdfd5affe49a0ab0232736c9b4eada8a2029 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -1422,7 +1422,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { doExecuteSecondaryMerge(pCmd, pLocalReducer, true); } -int32_t tscDoLocalreduce(SSqlObj *pSql) { +int32_t tscDoLocalMerge(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f8db232afa89908a9b7dc0fb2206398450254901..aa4eb01eae3a257ecf7d6cdb148cbea3c570eb18 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1470,7 +1470,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->code = tscDoLocalreduce(pSql); + pRes->code = tscDoLocalMerge(pSql); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index d99e916c73974b81ac8e8f08a305445ef76e31e0..533f992151f6d829e7f941d466c38e0f3ef47512 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -198,7 +198,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) { break; }; case TSDB_DATA_TYPE_BINARY: { - strncpy(val, src, len); + varDataCopy(val, src); break; }; case TSDB_DATA_TYPE_NCHAR: { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index bdd6a964bae9c759e86596be2485ece4d5c761ad..65c3efd0227759f22f83a5f30f4fe1bc81c46d45 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -35,9 +35,11 @@ extern "C" { // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR typedef int32_t VarDataOffsetT; typedef int16_t VarDataLenT; -#define varDataLen(v) ((VarDataLenT *)(v))[0] -#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) -#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT))) + +#define varDataLen(v) ((VarDataLenT *)(v))[0] +#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) +#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT))) +#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v)) // this data type is internally used only in 'in' query to hold the values #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) diff --git a/src/query/src/qextbuffer.c b/src/query/src/qextbuffer.c index 2b403eb92112fc70c8388addd93d0adbbe3d8e5e..adf15d1de0d90e287131bf7b076fa089e4156020 100644 --- a/src/query/src/qextbuffer.c +++ b/src/query/src/qextbuffer.c @@ -409,13 +409,21 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i return (first < second) ? -1 : 1; }; case TSDB_DATA_TYPE_BINARY: { - int32_t ret = strncmp(f1, f2, bytes); - if (ret == 0) { - return 0; + int32_t len1 = varDataLen(f1); + int32_t len2 = varDataLen(f2); + + if (len1 != len2) { + return len1 > len2? 1:-1; + } else { + int32_t ret = strncmp(varDataVal(f1), varDataVal(f2), len1); + if (ret == 0) { + return 0; + } + return (ret < 0) ? -1 : 1; } - return (ret < 0) ? -1 : 1; + }; - case TSDB_DATA_TYPE_NCHAR: { + case TSDB_DATA_TYPE_NCHAR: { // todo handle the var string compare int32_t ret = tasoUcs4Compare(f1, f2, bytes); if (ret == 0) { return 0; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index b8f7504102b59fa9fd20230c85b82235b0c56f9c..f0f96c15d5e96676182b2df2cfed29a66b9db293 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2555,7 +2555,7 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { tsdbGetTableName(tsdb, pTableId, &val); - bytes = TSDB_TABLE_NAME_LEN; + bytes = strnlen(val, TSDB_TABLE_NAME_LEN); type = TSDB_DATA_TYPE_BINARY; } else { tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val); @@ -4232,7 +4232,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool // normal query setup the queryhandle here - if (isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API. + if (isFirstLastRowQuery(pQuery) && !isSTableQuery) { // in case of last_row query, invoke a different API. pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); @@ -4478,22 +4478,16 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { return true; } -static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start) { +static UNUSED_FUNC int64_t doCheckTables(SQInfo *pQInfo, SArray* pTableList) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (!multiTableMultioutputHelper(pQInfo, index)) { + if (!multiTableMultioutputHelper(pQInfo, 0)) { return 0; } SPointInterpoSupporter pointInterpSupporter = {0}; pointInterpSupporterInit(pQuery, &pointInterpSupporter); - assert(0); - - // if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) { - // pointInterpSupporterDestroy(&pointInterpSupporter); - // return 0; - // } /* * here we set the value for before and after the specified time into the @@ -4537,62 +4531,51 @@ static void sequentialTableProcess(SQInfo *pQInfo) { resetCtxOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); -#if 0 while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); - size_t numOfTable = taosArrayGetSize(group); if (isFirstLastRowQuery(pQuery)) { - qTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->groupIndex); - - TSKEY key = -1; - int32_t index = -1; - - // choose the last key for one group - pQInfo->tableIndex = 0; + qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex, + numOfGroups); + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .colList = pQuery->colList, + .order = pQuery->order.order, + .numOfCols = pQuery->numOfCols, + }; + + SArray *g1 = taosArrayInit(1, POINTER_BYTES); + SArray *tx = taosArrayClone(group); + taosArrayPush(g1, &tx); - for (int32_t k = 0; k < numOfTable; ++k, pQInfo->tableIndex++) { - if (isQueryKilled(pQInfo)) { - return; - } + STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; + + // include only current table + if (pRuntimeEnv->pQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); + pRuntimeEnv->pQueryHandle = NULL; } - pQuery->window.skey = key; - pQuery->window.ekey = key; + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp); -// int64_t num = doCheckMetersInGroup(pQInfo, index, start); -// assert(num >= 0); - } else { - qTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->groupIndex); + initCtxOutputBuf(pRuntimeEnv); + setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb); + scanAllDataBlocks(pRuntimeEnv); - for (int32_t k = start; k <= end; ++k) { - if (isQueryKilled(pQInfo)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); - return; - } - - pQuery->skey = pSupporter->rawSKey; - pQuery->ekey = pSupporter->rawEKey; - - int64_t num = doCheckMetersInGroup(pQInfo, k, start); - if (num == 1) { - break; - } + int64_t numOfRes = getNumOfResult(pRuntimeEnv); + if (numOfRes > 0) { + pQuery->rec.rows += numOfRes; + forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } - } - - pSupporter->groupIndex++; - - // output buffer is full, return to client - if (pQuery->size >= pQuery->pointsToRead) { - break; + + skipResults(pRuntimeEnv); + pQInfo->groupIndex += 1; + + // enable execution for next table, when handling the projection query + enableExecutionForNextTable(pRuntimeEnv); } } -#endif - } else { createTableQueryInfo(pQInfo); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index bd6b6b3f1890be245efdc3c652bb62610b739f4b..e5744ab2ee8a7e0822952d70a38e38946e3db4f1 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -122,7 +122,7 @@ typedef struct STsdbQueryHandle { SRWHelper rhelper; } STsdbQueryHandle; -static void changeQueryHandleForQuery(TsdbQueryHandleT pqHandle); +static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -207,7 +207,7 @@ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable pQueryHandle->type = TSDB_QUERY_TYPE_LAST_ROW; pQueryHandle->order = TSDB_ORDER_DESC; - changeQueryHandleForQuery(pQueryHandle); + changeQueryHandleForLastrowQuery(pQueryHandle); return pQueryHandle; } @@ -957,7 +957,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) { } } -void changeQueryHandleForQuery(TsdbQueryHandleT pqHandle) { +void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; assert(!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order));