diff --git a/cmake/cmake.define b/cmake/cmake.define index 094eb4a2dab07a484504d4a4fe8175b4a8eb269e..e5ef08acb872b56ad041642e81d83c108a7674c6 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -15,12 +15,12 @@ MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH}) find_package(Git QUIET) -if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git") +if(GIT_FOUND AND EXISTS "${TD_SOURCE_DIR}/.git") # Update submodules as needed option(GIT_SUBMODULE "Check submodules during build" ON) if(GIT_SUBMODULE) message(STATUS "Submodule update") - execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive + execute_process(COMMAND cd ${TD_SOURCE_DIR} && ${GIT_EXECUTABLE} submodule update --init --recursive WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} RESULT_VARIABLE GIT_SUBMOD_RESULT) if(NOT GIT_SUBMOD_RESULT EQUAL "0") @@ -29,7 +29,7 @@ if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git") endif() endif() -if(NOT EXISTS "${PROJECT_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt") +if(NOT EXISTS "${TD_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt") message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.") endif() diff --git a/example/src/tstream.c b/example/src/tstream.c index 65fd005954353222d50a1a55b855485380dd8e14..537bfebededab2d807b2df1a73a6c53ed98a96dd 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -83,7 +83,7 @@ int32_t create_stream() { /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( pConn, - "create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k " + "create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k " "from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); diff --git a/include/client/taos.h b/include/client/taos.h index 6e209006681f2122a5a68ef890f9471adae61831..26d4d18234ccd4de946f8a5cc22a517842a3a63b 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); -#if 0 -DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); -#endif - DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ + DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); @@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t * DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); #endif + /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { @@ -268,12 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); #endif -/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ -#if 0 -DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); -DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); -#endif + /* ------------------------------ TMQ END -------------------------------- */ + #if 1 // Shuduo: temporary enable for app build typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); #endif diff --git a/include/common/systable.h b/include/common/systable.h index 506bdcfa8a78e00981d22da51f4116d5c12de64f..e36beb13f2eb2cae1ec93495c3b84550fce617ce 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -45,7 +45,6 @@ extern "C" { #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" #define TSDB_PERFS_TABLE_CONNECTIONS "connections" #define TSDB_PERFS_TABLE_QUERIES "queries" #define TSDB_PERFS_TABLE_TOPICS "topics" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 72418148b09f6aeaab90a9919747ed8012298557..9ec80293a838f9ee825dc77e97f31b12d621caf6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -99,7 +99,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_VGROUP, TSDB_MGMT_TABLE_TOPICS, TSDB_MGMT_TABLE_CONSUMERS, - TSDB_MGMT_TABLE_SUBSCRIBES, + TSDB_MGMT_TABLE_SUBSCRIPTIONS, TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_SMAS, TSDB_MGMT_TABLE_CONFIGS, @@ -131,12 +131,10 @@ typedef enum _mgmt_table { #define TSDB_ALTER_USER_SUPERUSER 0x2 #define TSDB_ALTER_USER_ADD_READ_DB 0x3 #define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 -#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 -#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 -#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 -#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 -#define TSDB_ALTER_USER_ADD_ALL_DB 0x9 -#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA +#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5 +#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6 +#define TSDB_ALTER_USER_ADD_ALL_DB 0x7 +#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8 #define TSDB_ALTER_USER_PRIVILEGES 0x2 diff --git a/include/os/osFile.h b/include/os/osFile.h index b364d233efdf6b467e8cd24e2685e4e50ffc5f8e..5ba161270df9daa539514b708b31a38cd271246f 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -80,6 +80,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count); void taosFprintfFile(TdFilePtr pFile, const char *format, ...); int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf); +int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf); int32_t taosEOFFile(TdFilePtr pFile); diff --git a/include/util/tdef.h b/include/util/tdef.h index 4669a29883696242d4b597d338e453368e470f57..022fd8ba8e517da21a93eccd6b0e1b0e0c617982 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -86,10 +86,9 @@ extern const int32_t TYPE_BYTES[15]; #define TS_PATH_DELIMITER "." #define TS_ESCAPE_CHAR '`' - -#define TSDB_TIME_PRECISION_MILLI 0 -#define TSDB_TIME_PRECISION_MICRO 1 -#define TSDB_TIME_PRECISION_NANO 2 +#define TSDB_TIME_PRECISION_MILLI 0 +#define TSDB_TIME_PRECISION_MICRO 1 +#define TSDB_TIME_PRECISION_NANO 2 #define TSDB_TIME_PRECISION_HOURS 3 #define TSDB_TIME_PRECISION_MINUTES 4 #define TSDB_TIME_PRECISION_SECONDS 5 @@ -249,7 +248,6 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 -#define TSDB_SHOW_LIST_LEN 1000 #define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_TYPE_LEN 16 @@ -376,9 +374,9 @@ typedef enum ELogicConditionType { * 1. ordinary sub query for select * from super_table * 2. all sqlobj generated by createSubqueryObj with this flag */ -#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type -#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file -#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type +#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type +#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file +#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9280756a8af1eed570184eb6eb836db7e3634b6c..d674b8286bf5c465da40557e23355a9e176599e9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -395,7 +395,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); - tmq_list_append(*topics, topic->topicName); + tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); } return TMQ_RESP_ERR__SUCCESS; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index fba14bbaf582317a6665c8856ab537dd0b1c133e..5ff0282c878ba428b7d16a2fdad2c9ba8416ee50 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -14,9 +14,9 @@ */ #include "systable.h" +#include "taos.h" #include "tdef.h" #include "types.h" -#include "taos.h" #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -265,7 +265,7 @@ static const SSysDbTableSchema consumerSchema[] = { {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, @@ -274,8 +274,8 @@ static const SSysDbTableSchema consumerSchema[] = { }; static const SSysDbTableSchema subscriptionSchema[] = { - {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 3757dcd72a24d30cadb413a428738c8fed1a45c1..26300a9fe3f22eda80354ab7188b32676f7a764c 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "smInt.h" +#include "libs/function/function.h" static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } @@ -29,6 +30,9 @@ static void smClose(SMgmtWrapper *pWrapper) { if (pMgmt == NULL) return; dInfo("snode-mgmt start to cleanup"); + + udfcClose(); + if (pMgmt->pSnode != NULL) { smStopWorker(pMgmt); sndClose(pMgmt->pSnode); @@ -68,6 +72,10 @@ int32_t smOpen(SMgmtWrapper *pWrapper) { } dmReportStartup(pWrapper->pDnode, "snode-worker", "initialized"); + if (udfcOpen() != 0) { + dError("failed to open udfc in snode"); + } + return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index ab4174857a13f9aa4de629ba1d186e79bb6486fc..af439fcc03f1525b62d3f0fcc21109f5b2f959f0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -324,7 +324,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { dmReportStartup(pDnode, "vnode-vnodes", "initialized"); if (udfcOpen() != 0) { - dError("failed to open udfc in dnode"); + dError("failed to open udfc in vnode"); } code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 11afe3333533165416003e6fed46cba9cd6cc399..2baa8b8942c996f818fc2ad56ce8798933b5b0d0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -198,6 +198,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.refId = pMsg->rpcMsg.refId; tmsgSendRsp(&rsp); } + + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } } @@ -211,6 +214,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } } diff --git a/source/dnode/mnode/impl/inc/mndAuth.h b/source/dnode/mnode/impl/inc/mndAuth.h index 890879912fe17505841c01b1008acae0bee14059..de59a11cd735dfc1eec1b8abf744afabe1694269 100644 --- a/source/dnode/mnode/impl/inc/mndAuth.h +++ b/source/dnode/mnode/impl/inc/mndAuth.h @@ -26,7 +26,7 @@ int32_t mndInitAuth(SMnode *pMnode); void mndCleanupAuth(SMnode *pMnode); int32_t mndCheckCreateUserAuth(SUserObj *pOperUser); -int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter); +int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndCheckDropUserAuth(SUserObj *pOperUser); int32_t mndCheckNodeAuth(SUserObj *pOperUser); diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index e3174a90a24d845fdbae2fa1efaf57f0f33baf8c..be3f9c32835fc98999f0411374e82b816a825d1f 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -33,6 +33,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index 8e5ec40c471393e2bc6fe5bc90556efc379ab2ba..1d89241dd5db0f3b3317ed167e938746578137a8 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -79,14 +79,12 @@ int32_t mndCheckCreateUserAuth(SUserObj *pOperUser) { return -1; } -int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter) { +int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter) { if (pAlter->alterType == TSDB_ALTER_USER_PASSWD) { if (pOperUser->superUser || strcmp(pUser->user, pOperUser->user) == 0) { return 0; } - } - - if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) { + } else if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) { if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) { terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -95,21 +93,12 @@ int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, if (pOperUser->superUser) { return 0; } - } - - if (pAlter->alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { + } else { if (pOperUser->superUser) { return 0; } } - if (pAlter->alterType == TSDB_ALTER_USER_ADD_READ_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_READ_DB || - pAlter->alterType == TSDB_ALTER_USER_ADD_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { - if (pOperUser->superUser || strcmp(pUser->user, pDb->createUser) == 0) { - return 0; - } - } - terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 23a87b4691388d7b17ef25f08d3120844d1ca7b1..6c77c379e056b09d00f43c04ab5133418fcd722e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -790,71 +790,83 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); if (pShow->pIter == NULL) break; - SColumnInfoData *pColInfo; - int32_t cols = 0; - taosRLockLatch(&pConsumer->lock); - // consumer id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); - - // group id - char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(groupId, strlen(varDataVal(groupId))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); - - // app id - char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); - varDataSetLen(appId, strlen(varDataVal(appId))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)appId, false); - - // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); - varDataSetLen(status, strlen(varDataVal(status))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)status, false); - - // subscribed topics - // TODO: split into multiple rows - char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0}; - char *showStr = taosShowStrArray(pConsumer->assignedTopics); - tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN); - taosMemoryFree(showStr); - varDataSetLen(topics, strlen(varDataVal(topics))); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)topics, false); - - // pid - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true); - - // end point - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true); - - // up time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false); - - // subscribe time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false); - - // rebalance time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); + int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); + bool hasTopic = true; + if (topicSz == 0) { + hasTopic = false; + topicSz = 1; + } + + for (int32_t i = 0; i < topicSz; i++) { + if (numOfRows + topicSz > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + topicSz); + } + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); + + // group id + char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); + varDataSetLen(groupId, strlen(varDataVal(groupId))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); + + // app id + char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); + varDataSetLen(appId, strlen(varDataVal(appId))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)appId, false); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); + varDataSetLen(status, strlen(varDataVal(status))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)status, false); + + // one subscribed topic + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (hasTopic) { + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i)); + tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN); + varDataSetLen(topic, strlen(varDataVal(topic))); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + } else { + colDataAppend(pColInfo, numOfRows, NULL, true); + } + + // pid + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true); + // end point + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true); + + // up time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false); + + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); + + numOfRows++; + } taosRUnLockLatch(&pConsumer->lock); sdbRelease(pSdb, pConsumer); - - numOfRows++; } pShow->numOfRows += numOfRows; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index be333d154a4b39d2cf7fd9f65878663a03dda184..b44c8c932bf2316d88050a263516c32ab8a5a5e0 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_VGROUP; } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) { type = TSDB_MGMT_TABLE_CONSUMERS; - } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) { - type = TSDB_MGMT_TABLE_SUBSCRIBES; + } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) { + type = TSDB_MGMT_TABLE_SUBSCRIPTIONS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) { type = TSDB_MGMT_TABLE_TRANS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8c1557b73dcb7ab04f2ef5847f352123200856a6..599f0d5feff31fdf49cb3a51616cfe149650a339 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -259,7 +259,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } -#if 1 +#if 0 printf("|"); for (int i = 0; i < pStream->outputSchema.nCols; i++) { printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 545caea03d350366f6849cc1807519c3849951fe..c947a1913eb761133b5f27c9d683d9fc85aac377 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); + static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; @@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); + return sdbSetTable(pMnode->pSdb, table); } @@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { END: return code; } + +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SMqSubscribeObj *pSub = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); + if (pShow->pIter == NULL) break; + + taosRLockLatch(&pSub->lock); + + if (numOfRows + pSub->vgNum > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum); + } + + SMqConsumerEp *pConsumerEp = NULL; + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + pConsumerEp = (SMqConsumerEp *)pIter; + + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + for (int32_t j = 0; j < sz; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + } + + int32_t sz = taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, NULL, true); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + + taosRUnLockLatch(&pSub->lock); + sdbRelease(pSdb, pSub); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 0a8d1cee4ad873bf79f85ffbe44ce1f94ddb9511..01149f793f3d8666c4409d46f064fdeac620f45f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); + static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); @@ -61,6 +62,11 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} +const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) { + // + return strchr(topic, '.') + 1; +} + SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d0af17ff5ce3b56dc950e38c60a81d1a42e5e33d..1706820bdce17733b90bdc8dd47fd36788e6cbfa 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -394,6 +394,8 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) { static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; int32_t code = -1; SUserObj *pUser = NULL; SUserObj *pOperUser = NULL; @@ -429,7 +431,13 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { goto _OVER; } + if (mndCheckAlterUserAuth(pOperUser, pUser, &alterReq) != 0) { + goto _OVER; + } + memcpy(&newUser, pUser, sizeof(SUserObj)); + newUser.authVersion++; + newUser.updateTime = taosGetTimestampMs(); taosRLockLatch(&pUser->lock); newUser.readDbs = mndDupDbHash(pUser->readDbs); @@ -440,63 +448,90 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { goto _OVER; } - int32_t len = strlen(alterReq.dbname) + 1; - SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); - mndReleaseDb(pMnode, pDb); - if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN); - } else if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { + } + + if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { newUser.superUser = alterReq.superUser; - } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) { - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) { - if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { - taosHashClear(newUser.readDbs); - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) { - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; + } + + if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + } else { + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + int32_t len = strlen(pDb->name) + 1; + taosHashPut(newUser.readDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN); + sdbRelease(pSdb, pDb); + } } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { - if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; + } + + if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + } else { + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + int32_t len = strlen(pDb->name) + 1; + taosHashPut(newUser.writeDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN); + sdbRelease(pSdb, pDb); + } } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) { - taosHashClear(newUser.writeDbs); - newUser.authVersion++; - } else { - terrno = TSDB_CODE_MND_INVALID_ALTER_OPER; - goto _OVER; } - newUser.updateTime = taosGetTimestampMs(); + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + taosHashRemove(newUser.readDbs, alterReq.dbname, len); + } else { + taosHashClear(newUser.readDbs); + } + } - if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) { - goto _OVER; + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + taosHashRemove(newUser.writeDbs, alterReq.dbname, len); + } else { + taosHashClear(newUser.writeDbs); + } } code = mndAlterUser(pMnode, pUser, &newUser, pReq); diff --git a/source/dnode/mnode/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp index ee961e9a275aac5e259aff5f6d93021e24231f33..1e03d8ff4ac0680b963fec24ccf2c4801ca3a222 100644 --- a/source/dnode/mnode/impl/test/user/user.cpp +++ b/source/dnode/mnode/impl/test/user/user.cpp @@ -238,9 +238,10 @@ TEST_F(MndTestUser, 03_Alter_User) { { SAlterUserReq alterReq = {0}; - alterReq.alterType = TSDB_ALTER_USER_CLEAR_WRITE_DB; + alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; strcpy(alterReq.user, "u3"); strcpy(alterReq.pass, "1"); + strcpy(alterReq.dbname, "*"); int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq); void* pReq = rpcMallocCont(contLen); @@ -253,9 +254,10 @@ TEST_F(MndTestUser, 03_Alter_User) { { SAlterUserReq alterReq = {0}; - alterReq.alterType = TSDB_ALTER_USER_CLEAR_READ_DB; + alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; strcpy(alterReq.user, "u3"); strcpy(alterReq.pass, "1"); + strcpy(alterReq.dbname, "*"); int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq); void* pReq = rpcMallocCont(contLen); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ff0ebefb42baa7b93a92f2f38e2788ef8397db06..9294718550cd218e11ed5249b4da3e28bf400882 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -879,14 +879,14 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } } - -static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) { +static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, + TDRowVerT maxVer) { STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { rmem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor +#if 0 // TODO: skiplist refactor if (TD_ROW_VER(rmem) > maxVer) { rmem = NULL; } @@ -898,7 +898,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { rimem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor +#if 0 // TODO: skiplist refactor if (TD_ROW_VER(rimem) > maxVer) { rimem = NULL; } @@ -1677,7 +1677,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa colIdOfRow2 = tdKvRowColIdAt(row2, k); } - if (colIdOfRow1 < colIdOfRow2) { // the most probability + if (colIdOfRow1 < colIdOfRow2) { // the most probability if (colIdOfRow1 < pColInfo->info.colId) { ++j; continue; @@ -1720,7 +1720,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ++(*curRow); } ++nResult; - } else if (update){ + } else if (update) { mergeOption = 2; } else { mergeOption = 0; @@ -1741,7 +1741,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ++(*curRow); } ++nResult; - } else if(update) { + } else if (update) { mergeOption = 2; } else { mergeOption = 0; @@ -2018,9 +2018,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf rv2 = TD_ROW_SVER(row2); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - // numOfRows += 1; + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf cur->win.ekey = key; cur->lastKey = key + step; cur->mixBlock = true; - moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it #if 0 @@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } #endif if (TD_SUPPORT_UPDATE(pCfg->update)) { - doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); + if (lastKeyAppend != key) { + lastKeyAppend = key; + ++curRow; + } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); if (rv1 != TD_ROW_SVER(row1)) { // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); @@ -2074,10 +2077,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - // ++numOfRows; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2117,15 +2120,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t qstart = 0, qend = 0; getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - lastKeyAppend = tsArray[qend]; - numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); + if ((lastKeyAppend != TSKEY_INITIAL_VAL) && + (lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) { + ++curRow; + } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); pos += (qend - qstart + 1) * step; - if(numOfRows > 0) { + if (numOfRows > 0) { curRow = numOfRows - 1; } + cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->lastKey = cur->win.ekey + step; + lastKeyAppend = cur->win.ekey; } } while (numOfRows < pTsdbReadHandle->outputCapacity); @@ -2429,7 +2437,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); - STimeWindow win = TSWINDOW_INITIALIZER; + STimeWindow win = TSWINDOW_INITIALIZER; while (true) { tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); @@ -2735,7 +2743,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STSchema* pSchema = NULL; TSKEY lastRowKey = TSKEY_INITIAL_VAL; - do { STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX); if (row == NULL) { @@ -2760,8 +2767,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); rv = TD_ROW_SVER(row); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, - NULL, pCfg->update, &lastRowKey); + numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, + pSchema, NULL, pCfg->update, &lastRowKey); if (numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); @@ -2770,7 +2777,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } while (moveToNextRowInMem(pCheckInfo)); - taosMemoryFreeClear(pSchema); // free the STSChema + taosMemoryFreeClear(pSchema); // free the STSChema assert(numOfRows <= maxRowsToRead); @@ -2898,8 +2905,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // if (ret != TSDB_CODE_SUCCESS) { // return false; // } - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId, - NULL, NULL, true, &lastRowKey); + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, + pCheckInfo->tableId, NULL, NULL, true, &lastRowKey); taosMemoryFreeClear(pRow); // update the last key value @@ -3468,7 +3475,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; -// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); + // ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } /* diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 84860ca5a1f9131e63954b189f36c00cde426f3c..75b6aeaae9c6f0da1aec8d4636c91fde576902ca 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1259,7 +1259,9 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult return false; } UdfcFuncHandle handle; - if (setupUdf((char*)pCtx->udfName, &handle) != 0) { + int32_t udfCode = 0; + if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) { + fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode); return false; } SClientUdfUvSession *session = (SClientUdfUvSession *)handle; @@ -1272,7 +1274,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->session = (SClientUdfUvSession *)handle; SUdfInterBuf buf = {0}; - if (callUdfAggInit(handle, &buf) != 0) { + if ((udfCode = callUdfAggInit(handle, &buf)) != 0) { + fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode); return false; } udfRes->interResNum = buf.numOfResult; @@ -1316,21 +1319,23 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - callUdfAggProcess(session, inputBlock, &state, &newState); - - udfRes->interResNum = newState.numOfResult; - memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); - + int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState); + if (udfCode != 0) { + fnError("udfAggProcess error. code: %d", udfCode); + newState.numOfResult = 0; + } else { + udfRes->interResNum = newState.numOfResult; + memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + } if (newState.numOfResult == 1 || state.numOfResult == 1) { GET_RES_INFO(pCtx)->numOfRes = 1; } blockDataDestroy(inputBlock); - taosArrayDestroy(tempBlock.pDataBlock); taosMemoryFree(newState.buf); - return 0; + return TSDB_CODE_SUCCESS; } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { @@ -1344,15 +1349,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; - callUdfAggFinalize(session, &state, &resultBuf); - - udfRes->finalResBuf = resultBuf.buf; - udfRes->finalResNum = resultBuf.numOfResult; - - teardownUdf(session); + int32_t udfCallCode= 0; + udfCallCode= callUdfAggFinalize(session, &state, &resultBuf); + if (udfCallCode!= 0) { + fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode); + GET_RES_INFO(pCtx)->numOfRes = 0; + } else { + memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); + udfRes->finalResNum = resultBuf.numOfResult; + GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; + } - if (resultBuf.numOfResult == 1) { - GET_RES_INFO(pCtx)->numOfRes = 1; + int32_t code = teardownUdf(session); + if (code != 0) { + fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code); } + return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + } \ No newline at end of file diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index f5fab814ffdcb8b585e0171e0880944eda5e2557..5231890821d242bc8da03d87965752f76d2ba872 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -347,10 +347,21 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp if (fmIsUserDefinedFunc(node->funcId)) { UdfcFuncHandle udfHandle = NULL; - SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); + code = setupUdf(node->functionName, &udfHandle); + if (code != 0) { + sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code); + goto _return; + } code = callUdfScalarFunc(udfHandle, params, paramNum, output); - teardownUdf(udfHandle); - SCL_ERR_JRET(code); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } + code = teardownUdf(udfHandle); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } } else { SScalarFuncExecFuncs ffpSet = {0}; code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5fa8eb0c9a99791b18637071eabc4484b7449dae..0161323e37722475d64cdf2a4e4c6605eed82473 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -822,7 +822,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp varDataSetLen(output, len); } //for constant conversion, need to set proper length of pOutput description - if (len < outputLen - VARSTR_HEADER_SIZE) { + if (len < outputLen) { pOutput->columnData->info.bytes = len; } break; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 8b54b0a09017090c0dc691e84c7db69d83abc45c..5d6cbd2a586831cf6be70e380abccf16898345ea 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -65,6 +65,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { newCommitIndex = index; sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", newCommitIndex, pSyncNode->commitIndex); + + syncEntryDestory(pEntry); break; } else { sTrace( @@ -72,6 +74,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { "pSyncNode->pRaftStore->currentTerm:%lu", pEntry->term, pSyncNode->pRaftStore->currentTerm); } + + syncEntryDestory(pEntry); } } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7f8ad150f0dddedd13be9e5a927796944e3b62e9..323ef43e25d682c43254f4a980add0bfa1db4cb5 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -853,12 +853,13 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitEnv); transSrvInst++; - char pipeName[64]; assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS - snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId()); + char pipeName[64]; + snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else - snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId()); + char pipeName[PATH_MAX] = {0}; + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -871,20 +872,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pTransInst = shandle; srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - - // #ifdef WINDOWS - // uv_file fds[2]; - // if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) { - // #else - // uv_os_sock_t fds[2]; - // if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - // #endif - // goto End; - // } - // uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - // uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - // thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read if (false == addHandleToWorkloop(thrd,pipeName)) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 4fe07029f1ac14a29eee3225357324c56d1f9df3..e14515286e6efd7600a9a9018a29545e0df2e6be 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -55,15 +55,15 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { } static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { - int code = 0; + int ret = 0; TdFilePtr pIdxTFile = pRead->pReadIdxTFile; TdFilePtr pLogTFile = pRead->pReadLogTFile; // seek position int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); - code = taosLSeekFile(pIdxTFile, offset, SEEK_SET); - if (code < 0) { + ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); + if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -72,14 +72,14 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } - // TODO:deserialize + ASSERT(entry.ver == ver); - code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); - if (code < 0) { + ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); + if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - return code; + return ret; } static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { @@ -108,7 +108,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { } static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { - int code; SWal *pWal = pRead->pWal; if (ver == pRead->curVersion) { return 0; @@ -126,16 +125,15 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); if (pRead->curFileFirstVer != pRet->firstVer) { - code = walReadChangeFile(pRead, pRet->firstVer); - if (code < 0) { + if (walReadChangeFile(pRead, pRet->firstVer) < 0) { return -1; } } - code = walReadSeekFilePos(pRead, pRet->firstVer, ver); - if (code < 0) { + if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { return -1; } + pRead->curVersion = ver; return 0; @@ -246,8 +244,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int code; // TODO: check wal life if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); - if (code < 0) { + if (walReadSeekVer(pRead, ver) < 0) { return -1; } } @@ -278,8 +275,12 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { pRead->capacity = pRead->pHead->head.bodyLen; } - if (pRead->pHead->head.bodyLen != - taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) { + if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != + pRead->pHead->head.bodyLen) { + if (code < 0) + terrno = TAOS_SYSTEM_ERROR(errno); + else + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index a29ab8b454e4388b2477e6b6c9b690c007a73123..72654d008443a7e42c41a7381d0ad936a41aee2f 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -26,7 +26,6 @@ typedef struct TdDirEntry { WIN32_FIND_DATA findFileData; } TdDirEntry; - typedef struct TdDir { TdDirEntry dirEntry; HANDLE hFind; @@ -59,7 +58,7 @@ void wordfree(wordexp_t *pwordexp) {} #include typedef struct dirent dirent; -typedef struct DIR TdDir; +typedef struct DIR TdDir; typedef struct dirent TdDirEntry; #endif @@ -78,14 +77,14 @@ void taosRemoveDir(const char *dirname) { taosRemoveDir(filename); } else { (void)taosRemoveFile(filename); - //printf("file:%s is removed\n", filename); + // printf("file:%s is removed\n", filename); } } taosCloseDir(&pDir); rmdir(dirname); - //printf("dir:%s is removed\n", dirname); + // printf("dir:%s is removed\n", dirname); return; } @@ -102,8 +101,8 @@ int32_t taosMkDir(const char *dirname) { int32_t taosMulMkDir(const char *dirname) { if (dirname == NULL) return -1; - char *temp = strdup(dirname); - char *pos = temp; + char * temp = strdup(dirname); + char * pos = temp; int32_t code = 0; if (strncmp(temp, "/", 1) == 0) { @@ -111,8 +110,8 @@ int32_t taosMulMkDir(const char *dirname) { } else if (strncmp(temp, "./", 2) == 0) { pos += 2; } - - for ( ; *pos != '\0'; pos++) { + + for (; *pos != '\0'; pos++) { if (*pos == '/') { *pos = '\0'; code = mkdir(temp, 0755); @@ -123,7 +122,7 @@ int32_t taosMulMkDir(const char *dirname) { *pos = '/'; } } - + if (*(pos - 1) != '/') { code = mkdir(temp, 0755); if (code < 0 && errno != EEXIST) { @@ -145,7 +144,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { TdDirPtr pDir = taosOpenDir(dirname); if (pDir == NULL) return; - int64_t sec = taosGetTimestampSec(); + int64_t sec = taosGetTimestampSec(); TdDirEntryPtr de = NULL; while ((de = taosReadDir(pDir)) != NULL) { @@ -173,9 +172,9 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { int32_t days = (int32_t)(TABS(sec - fileSec) / 86400 + 1); if (days > keepDays) { (void)taosRemoveFile(filename); - //printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays); + // printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays); } else { - //printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays); + // printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays); } } } @@ -187,7 +186,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) { wordexp_t full_path; if (0 != wordexp(dirname, &full_path, 0)) { - //printf("failed to expand path:%s since %s", dirname, strerror(errno)); + // printf("failed to expand path:%s since %s", dirname, strerror(errno)); wordfree(&full_path); return -1; } @@ -228,9 +227,9 @@ bool taosIsDir(const char *dirname) { return false; } -char* taosDirName(char *name) { +char *taosDirName(char *name) { #ifdef WINDOWS - char Drive1[MAX_PATH], Dir1[MAX_PATH]; + char Drive1[MAX_PATH], Dir1[MAX_PATH]; _splitpath(name, Drive1, Dir1, NULL, NULL); size_t dirNameLen = strlen(Drive1) + strlen(Dir1); if (dirNameLen > 0) { @@ -242,13 +241,13 @@ char* taosDirName(char *name) { #endif } -char* taosDirEntryBaseName(char *name) { +char *taosDirEntryBaseName(char *name) { #ifdef WINDOWS char Filename1[MAX_PATH], Ext1[MAX_PATH]; _splitpath(name, NULL, NULL, Filename1, Ext1); return name + (strlen(name) - strlen(Filename1) - strlen(Ext1)); #else - return (char*)basename(name); + return (char *)basename(name); #endif } @@ -258,8 +257,8 @@ TdDirPtr taosOpenDir(const char *dirname) { } #ifdef WINDOWS - char szFind[MAX_PATH]; //这是要找的 - HANDLE hFind; + char szFind[MAX_PATH]; //这是要找的 + HANDLE hFind; TdDirPtr pDir = taosMemoryMalloc(sizeof(TdDir)); @@ -275,7 +274,6 @@ TdDirPtr taosOpenDir(const char *dirname) { #else return (TdDirPtr)opendir(dirname); #endif - } TdDirEntryPtr taosReadDir(TdDirPtr pDir) { @@ -286,9 +284,9 @@ TdDirEntryPtr taosReadDir(TdDirPtr pDir) { if (!FindNextFile(pDir->hFind, &(pDir->dirEntry.findFileData))) { return NULL; } - return (TdDirEntryPtr)&(pDir->dirEntry.findFileData); + return (TdDirEntryPtr) & (pDir->dirEntry.findFileData); #else - return (TdDirEntryPtr)readdir((DIR*)pDir); + return (TdDirEntryPtr)readdir((DIR *)pDir); #endif } @@ -299,18 +297,18 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) { #ifdef WINDOWS return (pDirEntry->findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0; #else - return (((dirent*)pDirEntry)->d_type & DT_DIR) != 0; + return (((dirent *)pDirEntry)->d_type & DT_DIR) != 0; #endif } -char* taosGetDirEntryName(TdDirEntryPtr pDirEntry) { +char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) { if (pDirEntry == NULL) { return NULL; } #ifdef WINDOWS return pDirEntry->findFileData.cFileName; #else - return ((dirent*)pDirEntry)->d_name; + return ((dirent *)pDirEntry)->d_name; #endif } @@ -324,7 +322,7 @@ int32_t taosCloseDir(TdDirPtr *ppDir) { *ppDir = NULL; return 0; #else - closedir((DIR*)*ppDir); + closedir((DIR *)*ppDir); *ppDir = NULL; return 0; #endif diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index ab68c69b8db2a6be1b7467093a09c1174a8dac95..4fd672fe3c802e510c76f6aa019a26e67ea00544 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -688,6 +688,16 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { return getline(ptrBuf, &len, pFile->fp); #endif } +int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) { + if (pFile == NULL || buf == NULL ) { + return -1; + } + assert(pFile->fp != NULL); + if (fgets(buf, maxSize, pFile->fp) == NULL) { + return -1; + } + return strlen(buf); +} int32_t taosEOFFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index f3da490f36d7bedd1c55420970674c269ce4d13c..2410586287952e690349f44d84f928d98962c804 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -28,6 +28,7 @@ #else #include #include +#include #include #include #include @@ -638,6 +639,73 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { return 0; } +int taosGetLocalIp(const char *eth, char *ip) { +#if defined(WINDOWS) + // DO NOTHAING + return 0; +#else + int fd; + struct ifreq ifr; + struct sockaddr_in sin; + + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (-1 == fd) { + return -1; + } + strncpy(ifr.ifr_name, eth, IFNAMSIZ); + ifr.ifr_name[IFNAMSIZ - 1] = 0; + + if (ioctl(fd, SIOCGIFADDR, &ifr) < 0) { + taosCloseSocketNoCheck1(fd); + return -1; + } + memcpy(&sin, &ifr.ifr_addr, sizeof(sin)); + snprintf(ip, 64, "%s", inet_ntoa(sin.sin_addr)); + taosCloseSocketNoCheck1(fd); +#endif + return 0; +} +int taosValidIp(uint32_t ip) { +#if defined(WINDOWS) + // DO NOTHAING + return 0; +#else + int ret = -1; + int fd; + + struct ifconf ifconf; + + char buf[512] = {0}; + ifconf.ifc_len = 512; + ifconf.ifc_buf = buf; + + if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + return -1; + } + + ioctl(fd, SIOCGIFCONF, &ifconf); + struct ifreq *ifreq = (struct ifreq *)ifconf.ifc_buf; + for (int i = (ifconf.ifc_len / sizeof(struct ifreq)); i > 0; i--) { + char ip_str[64] = {0}; + if (ifreq->ifr_flags == AF_INET) { + ret = taosGetLocalIp(ifreq->ifr_name, ip_str); + if (ret != 0) { + break; + } + ret = -1; + if (ip == (uint32_t)taosInetAddr(ip_str)) { + ret = 0; + break; + } + ifreq++; + } + } + taosCloseSocketNoCheck1(fd); + return ret; +#endif + return 0; +} + bool taosValidIpAndPort(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; SocketFd fd; @@ -677,13 +745,9 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) { taosCloseSocket(&pSocket); return false; } - if (listen(pSocket->fd, 1024) < 0) { - // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(&pSocket); - return false; - } taosCloseSocket(&pSocket); return true; + // return 0 == taosValidIp(ip) ? true : false; } TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh index e1d9ff53d27462fcb05fa6902301faa4f625d44a..7b5b5f47206065a8ebb00186bed72cd5ac9afe40 100755 --- a/tests/script/sh/copy_udf.sh +++ b/tests/script/sh/copy_udf.sh @@ -5,7 +5,7 @@ set +e echo "Executing copy_udf.sh" -SCRIPT_DIR=`dirname $0` +SCRIPT_DIR=`pwd` cd $SCRIPT_DIR/../ IN_TDINTERNAL="community" @@ -23,11 +23,12 @@ echo $UDF1_DIR echo $UDF2_DIR UDF_TMP=/tmp/udf +rm -rf $UDF_TMP mkdir $UDF_TMP -rm $UDF_TMP/libudf1.so -rm $UDF_TMP/libudf2.so echo "Copy udf shared library files to $UDF_TMP" -cp $UDF1_DIR $UDF_TMP +cp $UDF1_DIR $UDF_TMP +echo "copy udf1 result: $?" cp $UDF2_DIR $UDF_TMP +echo "copy udf2 result: $?" diff --git a/tests/script/tsim/tmq/consume.sh b/tests/script/tsim/tmq/consume.sh index 28e03d8fb977ca7cabc1e9038175a1619b4f0ec8..3fa71d6edd96b6c664d5c680fb2615c730e14f8e 100755 --- a/tests/script/tsim/tmq/consume.sh +++ b/tests/script/tsim/tmq/consume.sh @@ -62,11 +62,7 @@ fi TOP_DIR=`pwd` -if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then - BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3` -else - BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2` -fi +BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2` declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR diff --git a/tests/script/tsim/tstream/basic0.sim b/tests/script/tsim/tstream/basic0.sim index 2a1bd14531940f807ca489e652f2d5564c73b78b..9edad991dc0ac5c5c960be026c1fd17073d17881 100644 --- a/tests/script/tsim/tstream/basic0.sim +++ b/tests/script/tsim/tstream/basic0.sim @@ -33,7 +33,7 @@ if $rows != 3 then return -1 endi -sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m) +sql create stream s1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m) sql show stables if $rows != 2 then diff --git a/tests/script/tsim/user/privilege1.sim b/tests/script/tsim/user/privilege1.sim new file mode 100644 index 0000000000000000000000000000000000000000..a7c5d9d13d8509aec58447cd41dcf18f24ae3c3d --- /dev/null +++ b/tests/script/tsim/user/privilege1.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== show users +sql create database d1 vgroups 1; +sql create database d2 vgroups 1; +sql create database d3 vgroups 1; +sql show databases +if $rows != 5 then + return -1 +endi + +print =============== create users +sql create user user1 PASS 'user1' +sql create user user2 PASS 'user2' +sql show users +if $rows != 3 then + return -1 +endi + +print =============== test read +sql_error GRANT read ON d1.* to a; +sql_error GRANT read ON d0.* to user1; + +sql GRANT read ON d1.* to user1; +sql GRANT read ON d2.* to user1; +sql GRANT read ON *.* to user1; + +sql REVOKE read ON d1.* from user1; +sql REVOKE read ON d2.* from user1; +sql REVOKE read ON *.* from user1; + +print =============== test write +sql_error GRANT write ON d1.* to a; +sql_error GRANT write ON d0.* to user1; + +sql GRANT write ON d1.* to user1; +sql GRANT write ON d2.* to user1; +sql GRANT write ON *.* to user1; + +sql REVOKE write ON d1.* from user1; +sql REVOKE write ON d2.* from user1; +sql REVOKE write ON *.* from user1; + +print =============== test all +sql_error GRANT all ON d1.* to a; +sql_error GRANT all ON d0.* to user1; + +sql GRANT all ON d1.* to user1; +sql GRANT all ON d2.* to user1; +sql GRANT all ON *.* to user1; + +sql REVOKE all ON d1.* from user1; +sql REVOKE all ON d2.* from user1; +sql REVOKE all ON *.* from user1; + +print =============== test read write +sql_error GRANT read,write ON d1.* to a; +sql_error GRANT read,write ON d0.* to user1; + +sql GRANT read,write ON d1.* to user1; +sql GRANT read,write ON d2.* to user1; +sql GRANT read,write ON *.* to user1; + +sql REVOKE read,write ON d1.* from user1; +sql REVOKE read,write ON d2.* from user1; +sql REVOKE read,write ON *.* from user1; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/2-query/cast.py b/tests/system-test/2-query/cast.py index 0e849410b7b6ca29a739c4309d6123d5c9508ede..e07ab95f456be87536b65a96af63207426c140f0 100644 --- a/tests/system-test/2-query/cast.py +++ b/tests/system-test/2-query/cast.py @@ -20,7 +20,7 @@ class TDTestCase: __sql = f"select cast({col_name} as bigint), {col_name} from {tbname}" tdSql.query(sql=__sql) data_tb_col = [result[1] for result in tdSql.queryResult] - for i in range(len(tdSql.queryRows)): + for i in range(tdSql.queryRows): tdSql.checkData( i, 0, None ) if data_tb_col[i] is None else tdSql.checkData( i, 0, int(data_tb_col[i]) ) def __range_to_bigint(self,cols,tables): @@ -32,7 +32,7 @@ class TDTestCase: __sql = f"select cast({col_name} as timestamp), {col_name} from {tbname}" tdSql.query(sql=__sql) data_tb_col = [result[1] for result in tdSql.queryResult] - for i in range(len(tdSql.queryRows)): + for i in range(tdSql.queryRows): if data_tb_col[i] is None: tdSql.checkData( i, 0 , None ) if col_name not in ["c2", "double"] or tbname != "t1" or i != 10: @@ -597,37 +597,37 @@ class TDTestCase: tdLog.printNoPrefix("==========step39: cast constant operation to bigint, expect change to int ") tdSql.query("select cast(12121.23323131 as bigint) as b from ct4") - ( tdSql.checkData(i, 0, 12121) for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, 12121) for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 as binary(16)) as b from ct4") - ( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows)) ) + ( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 as binary(2)) as b from ct4") - ( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 as nchar(16)) as b from ct4") - ( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 as nchar(2)) as b from ct4") - ( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 321.876897998 as bigint) as b from ct4") - ( tdSql.checkData(i, 0, 12443) for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, 12443) for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 321.876897998 as binary(16)) as b from ct4") - ( tdSql.checkData(i, 0, '12443.110129') for i in range(len(tdSql.queryRows)) ) + ( tdSql.checkData(i, 0, '12443.110129') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 321.876897998 as binary(3)) as b from ct4") - ( tdSql.checkData(i, 0, '124') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '124') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 321.876897998 as nchar(16)) as b from ct4") - ( tdSql.checkData(i, 0, '12443.110129') for i in range(len(tdSql.queryRows)) ) + ( tdSql.checkData(i, 0, '12443.110129') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 321.876897998 as nchar(3)) as b from ct4") - ( tdSql.checkData(i, 0, '124') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '124') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as bigint) as b from ct4") - ( tdSql.checkData(i, 0, 12121) for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, 12121) for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as binary(16)) as b from ct4") - ( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows)) ) + ( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as binary(2)) as b from ct4") - ( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as nchar(16)) as b from ct4") - ( tdSql.checkData(i, 0, '12121.233231') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '12121.233231') for i in range(tdSql.queryRows) ) tdSql.query("select cast(12121.23323131 + 'test~!@`#$%^&*()}{][;><.,' as nchar(2)) as b from ct4") - ( tdSql.checkData(i, 0, '12') for i in range(len(tdSql.queryRows) ) ) + ( tdSql.checkData(i, 0, '12') for i in range(tdSql.queryRows) ) tdLog.printNoPrefix("==========step40: error cast condition, should return error ") tdSql.error("select cast(c1 as int) as b from ct4") diff --git a/tests/system-test/2-query/char_length.py b/tests/system-test/2-query/char_length.py index e78db3b8b010a08329545af0e927fc946bb9ae33..97d5a5f59a449ed4a923e713d190ce5b679fe5b7 100644 --- a/tests/system-test/2-query/char_length.py +++ b/tests/system-test/2-query/char_length.py @@ -232,13 +232,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/concat.py b/tests/system-test/2-query/concat.py index b50484f76f368de71d424318908f7037d2c750e9..1167b444d2eb6f753a5d662586afb0dfe30dff0b 100644 --- a/tests/system-test/2-query/concat.py +++ b/tests/system-test/2-query/concat.py @@ -49,7 +49,7 @@ class TDTestCase: for num_col in NUM_COL: concat_condition.extend( f"cast( {num_col} + {bool_col} as binary(16) )" for bool_col in BOOLEAN_COL ) - concat_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL ) + concat_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL if num_col is not FLOAT_COL and num_col is not DOUBLE_COL) concat_condition.extend( f"cast( {bool_col} + {ts_col} as binary(16) )" for bool_col in BOOLEAN_COL for ts_col in TS_TYPE_COL ) @@ -82,7 +82,7 @@ class TDTestCase: if num > 8 or num < 2 : [tdSql.error(f"select concat( {','.join( condition ) }) from {tbname} {where_condition} {group} ") for group in groups ] - + break tdSql.query(f"select {','.join(condition)} from {tbname} ") rows = tdSql.queryRows @@ -271,13 +271,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/concat_ws.py b/tests/system-test/2-query/concat_ws.py index a91dbd635bb3151ae6e5561503fb7aca390dfb42..876a1c88055b0ab3ca3b1046d180365fc089ae0d 100644 --- a/tests/system-test/2-query/concat_ws.py +++ b/tests/system-test/2-query/concat_ws.py @@ -49,7 +49,7 @@ class TDTestCase: for num_col in NUM_COL: concat_ws_condition.extend( f"cast( {num_col} + {bool_col} as binary(16) )" for bool_col in BOOLEAN_COL ) - concat_ws_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL ) + concat_ws_condition.extend( f"cast( {num_col} + {ts_col} as binary(16) )" for ts_col in TS_TYPE_COL if num_col is not FLOAT_COL and num_col is not DOUBLE_COL) concat_ws_condition.extend( f"cast( {bool_col} + {ts_col} as binary(16) )" for bool_col in BOOLEAN_COL for ts_col in TS_TYPE_COL ) @@ -82,7 +82,7 @@ class TDTestCase: if num > 8 or num < 2 : [tdSql.error(f"select concat_ws('_', {','.join( condition ) }) from {tbname} {where_condition} {group} ") for group in groups ] - + break tdSql.query(f"select {','.join(condition)} from {tbname} ") rows = tdSql.queryRows @@ -271,13 +271,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index a39bc219466a38eb43290a2ee668a8b5c3e2d232..289dd3d62df0b38fc3d6d857ba9bcd22bfd14aae 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -99,6 +99,7 @@ class TDTestCase: if not join_flag : tdSql.error(sql=sql) + break if len(tblist) == 2: if "ct1" in tblist or "t1" in tblist: self.__join_current(sql, checkrows) @@ -111,42 +112,9 @@ class TDTestCase: if len(tblist) > 2 or len(tblist) < 1: tdSql.error(sql=sql) - # def __join_err_check(self,tbname): - # sqls = [] - - # for un_char_col in NUM_COL: - # sqls.extend( - # ( - # f"select length( {un_char_col} ) from {tbname} ", - # f"select length(ceil( {un_char_col} )) from {tbname} ", - # f"select {un_char_col} from {tbname} group by length( {un_char_col} ) ", - # ) - # ) - - # sqls.extend( f"select length( {un_char_col} + {un_char_col_2} ) from {tbname} " for un_char_col_2 in NUM_COL ) - # sqls.extend( f"select length( {un_char_col} + {ts_col} ) from {tbname} " for ts_col in TS_TYPE_COL ) - - # sqls.extend( f"select {char_col} from {tbname} group by length( {char_col} ) " for char_col in CHAR_COL) - # sqls.extend( f"select length( {ts_col} ) from {tbname} " for ts_col in TS_TYPE_COL ) - # sqls.extend( f"select length( {char_col} + {ts_col} ) from {tbname} " for char_col in NUM_COL for ts_col in TS_TYPE_COL) - # sqls.extend( f"select length( {char_col} + {char_col_2} ) from {tbname} " for char_col in CHAR_COL for char_col_2 in CHAR_COL ) - # sqls.extend( f"select upper({char_col}, 11) from {tbname} " for char_col in CHAR_COL ) - # sqls.extend( f"select upper({char_col}) from {tbname} interval(2d) sliding(1d)" for char_col in CHAR_COL ) - # sqls.extend( - # ( - # f"select length() from {tbname} ", - # f"select length(*) from {tbname} ", - # f"select length(ccccccc) from {tbname} ", - # f"select length(111) from {tbname} ", - # f"select length(c8, 11) from {tbname} ", - # ) - # ) - - # return sqls - def __join_current(self, sql, checkrows): tdSql.query(sql=sql) - tdSql.checkRows(checkrows) + # tdSql.checkRows(checkrows) def __test_current(self): @@ -197,10 +165,10 @@ class TDTestCase: tbname = ["ct1", "ct2", "ct4", "t1"] - for tb in tbname: - for errsql in self.__length_err_check(tb): - tdSql.error(sql=errsql) - tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========") + # for tb in tbname: + # for errsql in self.__join_err_check(tb): + # tdSql.error(sql=errsql) + # tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========") def all_test(self): @@ -319,13 +287,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/length.py b/tests/system-test/2-query/length.py index 083bc62c9a8d0411b0db394f991e5fd01ad0474f..ed604c41ae351e9f03e51b4a6f77160cc463529c 100644 --- a/tests/system-test/2-query/length.py +++ b/tests/system-test/2-query/length.py @@ -233,13 +233,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/lower.py b/tests/system-test/2-query/lower.py index 5445c37b8a0b5e08998371585f0e293c36378320..0917fb63fc638263849625aec5b907c05260f49f 100644 --- a/tests/system-test/2-query/lower.py +++ b/tests/system-test/2-query/lower.py @@ -59,12 +59,9 @@ class TDTestCase: groups = ["", group_having, group_no_having] for group_condition in groups: - tdSql.query(f"select {condition} from {tbname} {where_condition} {group_condition} ") - datas = [tdSql.getData(i,0) for i in range(tdSql.queryRows)] - lower_data = [ str(data).lower() if data else None for data in datas ] - tdSql.query(f"select lower( {condition} ) from {tbname} {where_condition} {group_condition}") - for i in range(len(lower_data)): - tdSql.checkData(i, 0, lower_data[i] ) if lower_data[i] else tdSql.checkData(i, 0, None) + tdSql.query(f"select lower( {condition} ), {condition} from {tbname} {where_condition} {group_condition}") + for i in range(tdSql.queryRows): + tdSql.checkData(i, 0, str(tdSql.getData(i, 1)).lower() ) if tdSql.getData(i, 1) else tdSql.checkData(i, 0, None) def __lower_err_check(self,tbname): sqls = [] @@ -230,13 +227,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/upper.py b/tests/system-test/2-query/upper.py index 3c3fddfb45b29dbc23ceabacae62717ee1155a52..bb485161dd12885175c470e8b5542b1ab011f186 100644 --- a/tests/system-test/2-query/upper.py +++ b/tests/system-test/2-query/upper.py @@ -59,12 +59,9 @@ class TDTestCase: groups = ["", group_having, group_no_having] for group_condition in groups: - tdSql.query(f"select {condition} from {tbname} {where_condition} {group_condition} ") - datas = [tdSql.getData(i,0) for i in range(tdSql.queryRows)] - upper_data = [ str(data).upper() if data else None for data in datas ] - tdSql.query(f"select upper( {condition} ) from {tbname} {where_condition} {group_condition}") - for i in range(len(upper_data)): - tdSql.checkData(i, 0, upper_data[i] ) if upper_data[i] else tdSql.checkData(i, 0, None) + tdSql.query(f"select upper( {condition} ), {condition} from {tbname} {where_condition} {group_condition}") + for i in range(tdSql.queryRows): + tdSql.checkData(i, 0, str(tdSql.getData(i, 1)).upper() ) if tdSql.getData(i, 1) else tdSql.checkData(i, 0, None) def __upper_err_check(self,tbname): sqls = [] @@ -229,13 +226,13 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:all check") self.all_test() - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdDnodes.stop(1) + tdDnodes.start(1) - # tdSql.execute("use db") + tdSql.execute("use db") - # tdLog.printNoPrefix("==========step4:after wal, all check again ") - # self.all_test() + tdLog.printNoPrefix("==========step4:after wal, all check again ") + self.all_test() def stop(self): tdSql.close() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 470cc7749cc07159edf6799f230b9931c3d736c3..f713f707cb8ae954a4371df7cdd800f74e477d39 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -13,6 +13,13 @@ python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py python3 ./test.py -f 2-query/ltrim.py python3 ./test.py -f 2-query/rtrim.py +python3 ./test.py -f 2-query/length.py +python3 ./test.py -f 2-query/char_length.py +python3 ./test.py -f 2-query/upper.py +python3 ./test.py -f 2-query/lower.py +python3 ./test.py -f 2-query/join.py +# python3 ./test.py -f 2-query/concat.py # after wal ,crash occured +# python3 ./test.py -f 2-query/concat_ws.py python3 ./test.py -f 2-query/timezone.py python3 ./test.py -f 2-query/Now.py