diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b3674a7bf5bf94806dfd576b4e81af95413bb5bd..0b2d4fd115850aa21d76ef274e62ebaaac04bc4c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -29,15 +29,16 @@ extern "C" { #include "tsched.h" #include "tsclient.h" -#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ +#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) + #define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) - -#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\ - (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) -#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ +#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ + (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) + +#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) #pragma pack(push,1) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 6df724881e145a5698a73b228761dc68187465dd..46292ceb91d290ec0d1af8c8648dee2eb0b1fb9e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5482,14 +5482,19 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { pQueryInfo->order.order = TSDB_ORDER_ASC; if (isTopBottomQuery(pQueryInfo)) { pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - } else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column - pQueryInfo->order.orderColId = INT32_MIN; + } else { // in case of select tbname from super_table, the default order column can not be the primary ts column + pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro } /* for super table query, set default ascending order for group output */ if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; } + + if (pQueryInfo->distinct) { + pQueryInfo->order.order = TSDB_ORDER_ASC; + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; + } } int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) { @@ -5501,17 +5506,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq setDefaultOrderInfo(pQueryInfo); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (pQueryInfo->distinct == true) { - pQueryInfo->order.order = TSDB_ORDER_ASC; - pQueryInfo->order.orderColId = 0; - return TSDB_CODE_SUCCESS; - } - if (pSqlNode->pSortOrder == NULL) { + if (pQueryInfo->distinct || pSqlNode->pSortOrder == NULL) { return TSDB_CODE_SUCCESS; } - SArray* pSortorder = pSqlNode->pSortOrder; + char* pMsgBuf = tscGetErrorMsgPayload(pCmd); + SArray* pSortOrder = pSqlNode->pSortOrder; /* * for table query, there is only one or none order option is allowed, which is the @@ -5519,19 +5519,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq * * for super table query, the order option must be less than 3. */ - size_t size = taosArrayGetSize(pSortorder); - if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { + size_t size = taosArrayGetSize(pSortOrder); + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) { if (size > 1) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0); + return invalidOperationMsg(pMsgBuf, msg0); } } else { if (size > 2) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + return invalidOperationMsg(pMsgBuf, msg3); } } // handle the first part of order by - tVariant* pVar = taosArrayGet(pSortorder, 0); + tVariant* pVar = taosArrayGet(pSortOrder, 0); // e.g., order by 1 asc, return directly with out further check. if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) { @@ -5543,7 +5543,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + return invalidOperationMsg(pMsgBuf, msg1); } bool orderByTags = false; @@ -5555,7 +5555,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq // it is a tag column if (pQueryInfo->groupbyExpr.columnInfo == NULL) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); if (relTagIndex == pColIndex->colIndex) { @@ -5576,13 +5576,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq orderByGroupbyCol = true; } } + if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + return invalidOperationMsg(pMsgBuf, msg3); } else { // order by top/bottom result value column is not supported in case of interval query. assert(!(orderByTags && orderByTS && orderByGroupbyCol)); } - size_t s = taosArrayGetSize(pSortorder); + size_t s = taosArrayGetSize(pSortOrder); if (s == 1) { if (orderByTags) { pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); @@ -5601,7 +5602,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq pExpr = tscExprGet(pQueryInfo, 1); if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); @@ -5619,9 +5620,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq addPrimaryTsColIntoResult(pQueryInfo, pCmd); } } - } - - if (s == 2) { + } else { tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0); if (orderByTags) { pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); @@ -5638,22 +5637,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq tVariant* pVar2 = &pItem->pVar; SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz}; if (getColumnIndexByName(&cname, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + return invalidOperationMsg(pMsgBuf, msg1); } if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } else { - tVariantListItem* p1 = taosArrayGet(pSortorder, 1); + tVariantListItem* p1 = taosArrayGet(pSortOrder, 1); pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } } - } else { // meter query - if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + } else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table + if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { + return invalidOperationMsg(pMsgBuf, msg1); } + if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) { bool validOrder = false; SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo; @@ -5661,13 +5661,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq SColIndex* pColIndex = taosArrayGet(columnInfo, 0); validOrder = (pColIndex->colIndex == index.columnIndex); } + if (!validOrder) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } + tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; pQueryInfo->groupbyExpr.orderType = p1->sortOrder; - } if (isTopBottomQuery(pQueryInfo)) { @@ -5683,13 +5684,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq pExpr = tscExprGet(pQueryInfo, 1); if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } + validOrder = true; } if (!validOrder) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); @@ -5699,6 +5701,18 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return TSDB_CODE_SUCCESS; } + tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; + } else { + // handle the temp table order by clause. You can order by any single column in case of the temp table, created by + // inner subquery. + assert(UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo) && taosArrayGetSize(pSqlNode->pSortOrder) == 1); + + if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { + return invalidOperationMsg(pMsgBuf, msg1); + } + tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; @@ -8458,8 +8472,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf const char* msg8 = "condition missing for join query"; const char* msg9 = "not support 3 level select"; - int32_t code = TSDB_CODE_SUCCESS; - + int32_t code = TSDB_CODE_SUCCESS; SSqlCmd* pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -8754,8 +8767,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo); pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo); pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo); - //pQueryInfo->globalMerge = tscIsTwoStageSTableQuery(pQueryInfo, 0); - pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3e8dfac1da0cb192282989b9ab1137427365b6d0..e809fe3137f24203f5ad9a3f864e6da8c9195824 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -880,16 +880,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; SQueryAttr query = {{0}}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); + query.vgId = pTableMeta->vgId; SArray* tableScanOperator = createTableScanPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3c35795b0dda8e4c147ed36d4931410dbe7c13a7..b393459c52e5958076b9e5a791f6a4683c8f4fe3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1206,7 +1206,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo); SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols); - pOutput->precision = pSqlObjList[0]->res.precision; SSchema* schema = NULL; diff --git a/src/connector/go b/src/connector/go index b8f76da4a708d158ec3cc4b844571dc4414e36b4..050667e5b4d0eafa5387e4283e713559b421203f 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4 +Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f diff --git a/src/connector/grafanaplugin b/src/connector/grafanaplugin index a44ec1ca493ad01b2bf825b6418f69e11f548206..32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df 160000 --- a/src/connector/grafanaplugin +++ b/src/connector/grafanaplugin @@ -1 +1 @@ -Subproject commit a44ec1ca493ad01b2bf825b6418f69e11f548206 +Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df diff --git a/src/connector/hivemq-tdengine-extension b/src/connector/hivemq-tdengine-extension index ce5201014136503d34fecbd56494b67b4961056c..b62a26ecc164a310104df57691691b237e091c89 160000 --- a/src/connector/hivemq-tdengine-extension +++ b/src/connector/hivemq-tdengine-extension @@ -1 +1 @@ -Subproject commit ce5201014136503d34fecbd56494b67b4961056c +Subproject commit b62a26ecc164a310104df57691691b237e091c89 diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5fe68e456fa11c7dd1fe0a2cdb5bb615dbc1bf8b..d30971ab47f7fd172809bd9333107a9c593c8f17 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -333,6 +333,7 @@ enum OPERATOR_TYPE_E { OP_StateWindow = 22, OP_AllTimeWindow = 23, OP_AllMultiTableTimeInterval = 24, + OP_Order = 25, }; typedef struct SOperatorInfo { @@ -417,7 +418,6 @@ typedef struct STableScanInfo { int32_t *rowCellInfoOffset; SExprInfo *pExpr; SSDataBlock block; - bool loadExternalRows; // load external rows (prev & next rows) int32_t numOfOutput; int64_t elapsedTime; @@ -541,6 +541,13 @@ typedef struct SMultiwayMergeInfo { SArray *udfInfo; } SMultiwayMergeInfo; +// todo support the disk-based sort +typedef struct SOrderOperatorInfo { + int32_t colIndex; + int32_t order; + SSDataBlock *pDataBlock; +} SOrderOperatorInfo; + void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); @@ -570,6 +577,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); +SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index cf0e8ce31af91ad9d94feed2045265862a57545a..c06fae298c0a22a6d33e4230fa2ce160545b038a 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -220,6 +220,8 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder void tOrderDescDestroy(tOrderDescriptor *pDesc); +void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn); + void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t srcStartRows, int32_t numOfRowsToWrite, int32_t srcCapacity); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e31792398d8123e27e06b929070fd1f9aa1860b6..7088d5883052386696833ac081a815a3a85b2501 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -224,6 +224,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); @@ -1622,12 +1623,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } startPos = pSDataBlock->info.rows - 1; - + // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); } - + break; } setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); @@ -2213,6 +2214,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } break; } + case OP_StateWindow: { pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; @@ -2229,24 +2231,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Filter: { // todo refactor int32_t numOfFilterCols = 0; -// if (pQueryAttr->numOfFilterCols > 0) { -// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, -// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols); -// } else { - if (pQueryAttr->stableQuery) { - SColumnInfo* pColInfo = - extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); - freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); - } else { - SColumnInfo* pColInfo = - extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, - pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); - freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); - } -// } + if (pQueryAttr->stableQuery) { + SColumnInfo* pColInfo = + extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); + freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); + } else { + SColumnInfo* pColInfo = + extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, + pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); + freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); + } + break; } @@ -2258,11 +2256,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_MultiwayMergeSort: { bool groupMix = true; - if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { + if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { groupMix = false; } + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, - 4096, merger, groupMix); // TODO hack it + 4096, merger, groupMix); // TODO hack it break; } @@ -2283,6 +2282,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } + case OP_Order: { + pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); + break; + } + default: { assert(0); } @@ -3092,7 +3096,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // check if this data block is required to load if ((*status) != BLK_DATA_ALL_NEEDED) { bool needFilter = true; - + // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { @@ -5404,6 +5408,114 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx return pOperator; } +static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { + assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); + + int32_t numOfCols = pSrc->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + + int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; + char* tmp = realloc(pCol2->pData, newSize); + if (tmp != NULL) { + pCol2->pData = tmp; + int32_t offset = pCol2->info.bytes * pDest->info.rows; + memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes); + } else { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + } + + pDest->info.rows += pSrc->info.rows; + + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doSort(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SOrderOperatorInfo* pInfo = pOperator->info; + + SSDataBlock* pBlock = NULL; + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + + // start to flush data into disk and try do multiway merge sort + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + break; + } + + int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + } + + int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; + void** pCols = calloc(numOfCols, POINTER_BYTES); + SSchema* pSchema = calloc(numOfCols, sizeof(SSchema)); + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); + pCols[i] = p1->pData; + pSchema[i].colId = p1->info.colId; + pSchema[i].bytes = p1->info.bytes; + pSchema[i].type = (uint8_t) p1->info.type; + } + + __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); + taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp); + + tfree(pCols); + tfree(pSchema); + return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; +} + +SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { + SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); + + { + SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); + pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData col = {{0}}; + col.info.colId = pExpr[i].base.colInfo.colId; + col.info.bytes = pExpr[i].base.colBytes; + col.info.type = pExpr[i].base.colType; + taosArrayPush(pDataBlock->pDataBlock, &col); + + if (col.info.colId == pOrderVal->orderColId) { + pInfo->colIndex = i; + } + } + + pDataBlock->info.numOfCols = numOfOutput; + pInfo->order = pOrderVal->order; + pInfo->pDataBlock = pDataBlock; + } + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "InMemoryOrder"; + pOperator->operatorType = OP_Order; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->exec = doSort; + pOperator->cleanup = destroyOrderOperatorInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + + appendUpstream(pOperator, upstream); + return pOperator; +} + static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -6404,6 +6516,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = destroyOutputBuf(pInfo->pRes); } +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { + SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; + pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); +} + static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param; doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols); @@ -6752,7 +6869,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doFill; pOperator->cleanup = destroySFillOperatorInfo; diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index cc47cc824bcf59f0839bc5a439d4d15b89e030ea..91004fe707ebecac4d73fcf1ea03638fbe07f073 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -1102,3 +1102,57 @@ void tOrderDescDestroy(tOrderDescriptor *pDesc) { destroyColumnModel(pDesc->pColumnModel); tfree(pDesc); } + +void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) { + assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols); + + int32_t bytes = pSchema[index].bytes; + int32_t size = bytes + sizeof(int32_t); + + char* buf = calloc(1, size * numOfRows); + + for(int32_t i = 0; i < numOfRows; ++i) { + char* dest = buf + size * i; + memcpy(dest, pCols[index] + bytes * i, bytes); + *(int32_t*)(dest+bytes) = i; + } + + qsort(buf, numOfRows, size, compareFn); + + int32_t prevLength = 0; + char* p = NULL; + + for(int32_t i = 0; i < numOfCols; ++i) { + int32_t bytes1 = pSchema[i].bytes; + + if (i == index) { + for(int32_t j = 0; j < numOfRows; ++j){ + char* src = buf + (j * size); + char* dest = pCols[i] + (j * bytes1); + memcpy(dest, src, bytes1); + } + } else { + // make sure memory buffer is enough + if (prevLength < bytes1) { + char *tmp = realloc(p, bytes1 * numOfRows); + assert(tmp); + + p = tmp; + prevLength = bytes1; + } + + memcpy(p, pCols[i], bytes1 * numOfRows); + + for(int32_t j = 0; j < numOfRows; ++j){ + char* dest = pCols[i] + bytes1 * j; + + int32_t newPos = *(int32_t*)(buf + (j * size) + bytes); + char* src = p + (newPos * bytes1); + memcpy(dest, src, bytes1); + } + } + } + + tfree(buf); + tfree(p); +} \ No newline at end of file diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index e3326cc26bc4216d131211473e807a9845d49133..e9022db503f005ae6713e66e47bbde440bb4aaf7 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, } pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes; - pBucket->comparFn = getKeyComparFunc(pBucket->type); + pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC); pBucket->hashFunc = getHashFunc(pBucket->type); if (pBucket->hashFunc == NULL) { diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index b8a5ee7699b34fed82ad67a592a6ca9148cc92cb..f72f70c91179eac860a831cf02821f88651da2ac 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -557,10 +557,9 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { int32_t op = 0; if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query - if (onlyQueryTags(pQueryAttr)) { - op = OP_TagScan; - taosArrayPush(plan, &op); - } + op = OP_TagScan; + taosArrayPush(plan, &op); + if (pQueryAttr->distinct) { op = OP_Distinct; taosArrayPush(plan, &op); @@ -651,8 +650,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } } + + // outer query order by support + int32_t orderColId = pQueryAttr->order.orderColId; + if (pQueryAttr->vgId == 0 && orderColId != PRIMARYKEY_TIMESTAMP_COL_INDEX && orderColId != INT32_MIN) { + op = OP_Order; + taosArrayPush(plan, &op); + } } - if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { op = OP_Limit; diff --git a/src/util/inc/tcompare.h b/src/util/inc/tcompare.h index 612ce7ede0976764843a16002c7770cfa2a6bbb7..4861779acdafe90a348348245afd512fcee67f7d 100644 --- a/src/util/inc/tcompare.h +++ b/src/util/inc/tcompare.h @@ -47,7 +47,7 @@ int WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, con int32_t doCompare(const char* a, const char* b, int32_t type, size_t size); -__compar_fn_t getKeyComparFunc(int32_t keyType); +__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order); __compar_fn_t getComparFunc(int32_t type, int32_t optr); diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index e953f4c4649b6f906b8517a7165600d246da2176..6e135878c163b0a6b3b19890301a2a75f6470de5 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -16,58 +16,101 @@ #include "os.h" #include "ttype.h" #include "tcompare.h" -#include "tarray.h" #include "hash.h" -int32_t compareInt32Val(const void *pLeft, const void *pRight) { - int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight); +int32_t setCompareBytes1(const void *pLeft, const void *pRight) { + return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0; +} + +int32_t setCompareBytes2(const void *pLeft, const void *pRight) { + return NULL != taosHashGet((SHashObj *)pRight, pLeft, 2) ? 1 : 0; +} + +int32_t setCompareBytes4(const void *pLeft, const void *pRight) { + return NULL != taosHashGet((SHashObj *)pRight, pLeft, 4) ? 1 : 0; +} + +int32_t setCompareBytes8(const void *pLeft, const void *pRight) { + return NULL != taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0; +} + +int32_t compareInt8Val(const void *pLeft, const void *pRight) { + int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } -int32_t compareInt64Val(const void *pLeft, const void *pRight) { - int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight); +int32_t compareInt8ValDesc(const void *pLeft, const void *pRight) { + return compareInt8Val(pRight, pLeft); +} + +int32_t compareInt16Val(const void *pLeft, const void *pRight) { + int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } -int32_t compareInt16Val(const void *pLeft, const void *pRight) { - int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight); +int32_t compareInt16ValDesc(const void* pLeft, const void* pRight) { + return compareInt16Val(pRight, pLeft); +} + +int32_t compareInt32Val(const void *pLeft, const void *pRight) { + int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } -int32_t compareInt8Val(const void *pLeft, const void *pRight) { - int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight); +int32_t compareInt32ValDesc(const void* pLeft, const void* pRight) { + return compareInt32Val(pRight, pLeft); +} + +int32_t compareInt64Val(const void *pLeft, const void *pRight) { + int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } +int32_t compareInt64ValDesc(const void* pLeft, const void* pRight) { + return compareInt64Val(pRight, pLeft); +} + int32_t compareUint32Val(const void *pLeft, const void *pRight) { - int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight); + uint32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } +int32_t compareUint32ValDesc(const void* pLeft, const void* pRight) { + return compareUint32Val(pRight, pLeft); +} + int32_t compareUint64Val(const void *pLeft, const void *pRight) { - int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight); + uint64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } +int32_t compareUint64ValDesc(const void* pLeft, const void* pRight) { + return compareUint64Val(pRight, pLeft); +} + int32_t compareUint16Val(const void *pLeft, const void *pRight) { - int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight); + uint16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } +int32_t compareUint16ValDesc(const void* pLeft, const void* pRight) { + return compareUint16Val(pRight, pLeft); +} + int32_t compareUint8Val(const void* pLeft, const void* pRight) { uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight); if (left > right) return 1; @@ -75,6 +118,10 @@ int32_t compareUint8Val(const void* pLeft, const void* pRight) { return 0; } +int32_t compareUint8ValDesc(const void* pLeft, const void* pRight) { + return compareUint8Val(pRight, pLeft); +} + int32_t compareFloatVal(const void *pLeft, const void *pRight) { float p1 = GET_FLOAT_VAL(pLeft); float p2 = GET_FLOAT_VAL(pRight); @@ -92,8 +139,12 @@ int32_t compareFloatVal(const void *pLeft, const void *pRight) { } if (FLT_EQUAL(p1, p2)) { return 0; - } - return FLT_GREATER(p1, p2) ? 1: -1; + } + return FLT_GREATER(p1, p2) ? 1: -1; +} + +int32_t compareFloatValDesc(const void* pLeft, const void* pRight) { + return compareFloatVal(pRight, pLeft); } int32_t compareDoubleVal(const void *pLeft, const void *pRight) { @@ -113,14 +164,18 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) { } if (FLT_EQUAL(p1, p2)) { return 0; - } - return FLT_GREATER(p1, p2) ? 1: -1; + } + return FLT_GREATER(p1, p2) ? 1: -1; +} + +int32_t compareDoubleValDesc(const void* pLeft, const void* pRight) { + return compareDoubleVal(pRight, pLeft); } int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) { int32_t len1 = varDataLen(pLeft); int32_t len2 = varDataLen(pRight); - + if (len1 != len2) { return len1 > len2? 1:-1; } else { @@ -133,10 +188,14 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) { } } +int32_t compareLenPrefixedStrDesc(const void* pLeft, const void* pRight) { + return compareLenPrefixedStr(pRight, pLeft); +} + int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) { int32_t len1 = varDataLen(pLeft); int32_t len2 = varDataLen(pRight); - + if (len1 != len2) { return len1 > len2? 1:-1; } else { @@ -149,6 +208,10 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) { } } +int32_t compareLenPrefixedWStrDesc(const void* pLeft, const void* pRight) { + return compareLenPrefixedWStr(pRight, pLeft); +} + /* * Compare two strings * TSDB_MATCH: Match @@ -161,33 +224,33 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) { */ int patternMatch(const char *patterStr, const char *str, size_t size, const SPatternCompareInfo *pInfo) { char c, c1; - + int32_t i = 0; int32_t j = 0; - + while ((c = patterStr[i++]) != 0) { if (c == pInfo->matchAll) { /* Match "*" */ - + while ((c = patterStr[i++]) == pInfo->matchAll || c == pInfo->matchOne) { if (c == pInfo->matchOne && (j > size || str[j++] == 0)) { // empty string, return not match return TSDB_PATTERN_NOWILDCARDMATCH; } } - + if (c == 0) { return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */ } - + char next[3] = {toupper(c), tolower(c), 0}; while (1) { size_t n = strcspn(str, next); str += n; - + if (str[0] == 0 || (n >= size)) { break; } - + int32_t ret = patternMatch(&patterStr[i], ++str, size - n - 1, pInfo); if (ret != TSDB_PATTERN_NOMATCH) { return ret; @@ -195,18 +258,18 @@ int patternMatch(const char *patterStr, const char *str, size_t size, const SPat } return TSDB_PATTERN_NOWILDCARDMATCH; } - + c1 = str[j++]; - + if (j <= size) { if (c == c1 || tolower(c) == tolower(c1) || (c == pInfo->matchOne && c1 != 0)) { continue; } } - + return TSDB_PATTERN_NOMATCH; } - + return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH; } @@ -214,13 +277,13 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c wchar_t c, c1; wchar_t matchOne = L'_'; // "_" wchar_t matchAll = L'%'; // "%" - + int32_t i = 0; int32_t j = 0; - + while ((c = patterStr[i++]) != 0) { if (c == matchAll) { /* Match "%" */ - + while ((c = patterStr[i++]) == matchAll || c == matchOne) { if (c == matchOne && (j > size || str[j++] == 0)) { return TSDB_PATTERN_NOWILDCARDMATCH; @@ -229,40 +292,40 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c if (c == 0) { return TSDB_PATTERN_MATCH; } - + wchar_t accept[3] = {towupper(c), towlower(c), 0}; while (1) { size_t n = wcscspn(str, accept); - + str += n; if (str[0] == 0 || (n >= size)) { break; } - + int32_t ret = WCSPatternMatch(&patterStr[i], ++str, size - n - 1, pInfo); if (ret != TSDB_PATTERN_NOMATCH) { return ret; } } - + return TSDB_PATTERN_NOWILDCARDMATCH; } - + c1 = str[j++]; - + if (j <= size) { if (c == c1 || towlower(c) == towlower(c1) || (c == matchOne && c1 != 0)) { continue; } } - + return TSDB_PATTERN_NOMATCH; } return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH; } -static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { +int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { SPatternCompareInfo pInfo = {'%', '_'}; char pattern[128] = {0}; @@ -270,8 +333,8 @@ static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { assert(varDataLen(pRight) < 128); size_t sz = varDataLen(pLeft); - char *buf = malloc(sz + 1); - memcpy(buf, varDataVal(pLeft), sz); + char *buf = malloc(sz + 1); + memcpy(buf, varDataVal(pLeft), sz); buf[sz] = 0; int32_t ret = patternMatch(pattern, buf, sz, &pInfo); @@ -282,19 +345,15 @@ static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { int32_t taosArrayCompareString(const void* a, const void* b) { const char* x = *(const char**)a; const char* y = *(const char**)b; - + return compareLenPrefixedStr(x, y); } -//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) { -// const SArray* arr = (const SArray*) pRight; -// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1; -//} -static int32_t compareFindItemInSet(const void *pLeft, const void* pRight) { - return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0; +int32_t compareFindItemInSet(const void *pLeft, const void* pRight) { + return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0; } -static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { +int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { SPatternCompareInfo pInfo = {'%', '_'}; wchar_t pattern[128] = {0}; @@ -302,14 +361,37 @@ static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { memcpy(pattern, varDataVal(pRight), varDataLen(pRight)); assert(varDataLen(pRight) < 128); - + int32_t ret = WCSPatternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft)/TSDB_NCHAR_SIZE, &pInfo); return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; } __compar_fn_t getComparFunc(int32_t type, int32_t optr) { __compar_fn_t comparFn = NULL; - + + if (optr == TSDB_RELATION_IN && (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR)) { + switch (type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + return setCompareBytes1; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + return setCompareBytes2; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_FLOAT: + return setCompareBytes4; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_TIMESTAMP: + return setCompareBytes8; + default: + assert(0); + } + } + switch (type) { case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: comparFn = compareInt8Val; break; @@ -327,13 +409,15 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) { } else { /* normal relational comparFn */ comparFn = compareLenPrefixedStr; } - + break; } - + case TSDB_DATA_TYPE_NCHAR: { if (optr == TSDB_RELATION_LIKE) { comparFn = compareWStrPatternComp; + } else if (optr == TSDB_RELATION_IN) { + comparFn = compareFindItemInSet; } else { comparFn = compareLenPrefixedWStr; } @@ -349,57 +433,57 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) { comparFn = compareInt32Val; break; } - + return comparFn; } -__compar_fn_t getKeyComparFunc(int32_t keyType) { +__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) { __compar_fn_t comparFn = NULL; - + switch (keyType) { case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_BOOL: - comparFn = compareInt8Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt8Val:compareInt8ValDesc; break; case TSDB_DATA_TYPE_SMALLINT: - comparFn = compareInt16Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt16Val:compareInt16ValDesc; break; case TSDB_DATA_TYPE_INT: - comparFn = compareInt32Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc; break; case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: - comparFn = compareInt64Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt64Val:compareInt64ValDesc; break; case TSDB_DATA_TYPE_FLOAT: - comparFn = compareFloatVal; + comparFn = (order == TSDB_ORDER_ASC)? compareFloatVal:compareFloatValDesc; break; case TSDB_DATA_TYPE_DOUBLE: - comparFn = compareDoubleVal; + comparFn = (order == TSDB_ORDER_ASC)? compareDoubleVal:compareDoubleValDesc; break; case TSDB_DATA_TYPE_UTINYINT: - comparFn = compareUint8Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint8Val:compareUint8ValDesc; break; case TSDB_DATA_TYPE_USMALLINT: - comparFn = compareUint16Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint16Val:compareUint16ValDesc; break; case TSDB_DATA_TYPE_UINT: - comparFn = compareUint32Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint32Val:compareUint32ValDesc; break; case TSDB_DATA_TYPE_UBIGINT: - comparFn = compareUint64Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint64Val:compareUint64ValDesc; break; case TSDB_DATA_TYPE_BINARY: - comparFn = compareLenPrefixedStr; + comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedStr:compareLenPrefixedStrDesc; break; case TSDB_DATA_TYPE_NCHAR: - comparFn = compareLenPrefixedWStr; + comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedWStr:compareLenPrefixedWStrDesc; break; default: - comparFn = compareInt32Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc; break; } - + return comparFn; } @@ -433,7 +517,7 @@ int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) { default: { // todo refactor tstr* t1 = (tstr*) f1; tstr* t2 = (tstr*) f2; - + if (t1->len != t2->len) { return t1->len > t2->len? 1:-1; } else { diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index b464519ba66776ba13ce2964070d19a2a4430bfb..98fd9c094cba3e779c9f203fdacc548a3bda5ef4 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -54,7 +54,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ pSkipList->keyFn = fn; pSkipList->seed = rand(); if (comparFn == NULL) { - pSkipList->comparFn = getKeyComparFunc(keyType); + pSkipList->comparFn = getKeyComparFunc(keyType, TSDB_ORDER_ASC); } else { pSkipList->comparFn = comparFn; } diff --git a/src/util/tests/skiplistTest.cpp b/src/util/tests/skiplistTest.cpp index dfbe0f67167ad12cedc0239fc310614ef747b080..df4c5af5e2ab62efab287f3dd00650fc29805c98 100644 --- a/src/util/tests/skiplistTest.cpp +++ b/src/util/tests/skiplistTest.cpp @@ -70,7 +70,7 @@ void doubleSkipListTest() { } void randKeyTest() { - SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), getKeyComparFunc(TSDB_DATA_TYPE_INT), + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), getKeyComparFunc(TSDB_DATA_TYPE_INT, TSDB_ORDER_ASC), false, getkey); int32_t size = 200000;