diff --git a/docs/en/14-reference/14-taosKeeper.md b/docs/en/14-reference/14-taosKeeper.md index 9c4a2da92153f2a553932382b5aa27f279550a6f..cfc25543873f56f5511abcf3b6c20d556b3da4fc 100644 --- a/docs/en/14-reference/14-taosKeeper.md +++ b/docs/en/14-reference/14-taosKeeper.md @@ -223,3 +223,29 @@ Content-Length: 19 {"version":"1.0.0"} ``` + +### taoskeeper with Prometheus + +There is `/metrics` api in taoskeeper provide TDengine metric data for Prometheus. + +#### scrape config + +Scrape config in Prometheus specifies a set of targets and parameters describing how to scrape metric data from endpoint. For more information, please reference to [Prometheus documents](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config). + +``` +# A scrape configuration containing exactly one endpoint to scrape: +# Here it's Prometheus itself. +scrape_configs: + - job_name: "taoskeeper" + # metrics_path defaults to '/metrics' + # scheme defaults to 'http'. + static_configs: + - targets: ["localhost:6043"] +``` + +#### Dashboard + +There is a dashboard named `TaosKeeper Prometheus Dashboard for 3.x`, which provides a monitoring dashboard similar to TInsight. + +In Grafana, click the Dashboard menu and click `import`, enter the dashboard ID `18587` and click the `Load` button. Then finished importing `TaosKeeper Prometheus Dashboard for 3.x` dashboard. + diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 12e466f3491b779275d29bcfa79c3203aee01056..9bbb551c8781f7a8fd17f9d91cd820b1e3863550 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -213,3 +213,29 @@ DELETE_MARK time ``` DELETE_MARK用于删除缓存的窗口状态,也就是删除流计算的中间结果。如果不设置,默认值是10年 T = 最新事件时间 - DELETE_MARK + +## 流式计算支持的函数 + +1. 所有的单行函数均可用于流计算中。 +2. 以下 19 个聚合函数不能在创建流计算的 SQL 语句中使用 +``` +leastsquares +percentile +top +bottom +elapsed +interp +derivative +irate +twa +histogram +diff +statecount +stateduration +csum +mavg +sample +tail +unique +mode +``` diff --git a/docs/zh/14-reference/14-taosKeeper.md b/docs/zh/14-reference/14-taosKeeper.md index 03ca30781fc9aec4f6ada66be76bd221da286f6d..b4d35fb2400be9407e32182c3d632a52538b95bd 100644 --- a/docs/zh/14-reference/14-taosKeeper.md +++ b/docs/zh/14-reference/14-taosKeeper.md @@ -226,3 +226,29 @@ Content-Length: 19 {"version":"1.0.0"} ``` + +### 集成 Prometheus + +taoskeeper 提供了 `/metrics` 接口,返回了 Prometheus 格式的监控数据,Prometheus 可以从 taoskeeper 抽取监控数据,实现通过 Prometheus 监控 TDengine 的目的。 + +#### 抽取配置 + +Prometheus 提供了 `scrape_configs` 配置如何从 endpoint 抽取监控数据,通常只需要修改 `static_configs` 中的 targets 配置为 taoskeeper 的 endpoint 地址,更多配置信息请参考 [Prometheus 配置文档](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config)。 + +``` +# A scrape configuration containing exactly one endpoint to scrape: +# Here it's Prometheus itself. +scrape_configs: + - job_name: "taoskeeper" + # metrics_path defaults to '/metrics' + # scheme defaults to 'http'. + static_configs: + - targets: ["localhost:6043"] +``` + +#### Dashboard + +我们提供了 `TaosKeeper Prometheus Dashboard for 3.x` dashboard,提供了和 TDinsight 类似的监控 dashboard。 + +在 Grafana Dashboard 菜单点击 `import`,dashboard ID 填写 `18587`,点击 `Load` 按钮即可导入 `TaosKeeper Prometheus Dashboard for 3.x` dashboard。 + diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 83b40c4f20a332b94abe45dd5c4fb7afd820861a..94545dfaad943a3ec5e35ae6859213835d8c4540 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -20,7 +20,8 @@ #include #include "taos.h" -static int running = 1; +static int running = 1; +const char* topic_name = "topicname"; static int32_t msg_process(TAOS_RES* msg) { char buf[1024]; @@ -243,7 +244,7 @@ _end: tmq_list_t* build_topic_list() { tmq_list_t* topicList = tmq_list_new(); - int32_t code = tmq_list_append(topicList, "topicname"); + int32_t code = tmq_list_append(topicList, topic_name); if (code) { tmq_list_destroy(topicList); return NULL; @@ -269,6 +270,31 @@ void basic_consume_loop(tmq_t* tmq) { fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +void consume_repeatly(tmq_t* tmq) { + int32_t numOfAssignment = 0; + tmq_topic_assignment* pAssign = NULL; + + int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment); + if (code != 0) { + fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code)); + } + + // seek to the earliest offset + for(int32_t i = 0; i < numOfAssignment; ++i) { + tmq_topic_assignment* p = &pAssign[i]; + + code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); + if (code != 0) { + fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code)); + } + } + + free(pAssign); + + // let's do it again + basic_consume_loop(tmq); +} + int main(int argc, char* argv[]) { int32_t code; @@ -294,10 +320,13 @@ int main(int argc, char* argv[]) { if ((code = tmq_subscribe(tmq, topic_list))) { fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); } + tmq_list_destroy(topic_list); basic_consume_loop(tmq); + consume_repeatly(tmq); + code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code)); diff --git a/include/client/taos.h b/include/client/taos.h index 05f3c5d11de38e742beb6520cd825ca69a9c394b..44703c2797d3e39bbc05f817a0cfd65acd6af2cf 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -101,6 +101,7 @@ typedef struct TAOS_FIELD_E { #endif typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *res, int code); +typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type); typedef struct TAOS_MULTI_BIND { int buffer_type; @@ -121,6 +122,10 @@ typedef enum { SET_CONF_RET_ERR_TOO_LONG = -6 } SET_CONF_RET_CODE; +typedef enum { + TAOS_NOTIFY_PASSVER = 0, +} TAOS_NOTIFY_TYPE; + #define RET_MSG_LENGTH 1024 typedef struct setConfRet { SET_CONF_RET_CODE retCode; @@ -225,6 +230,8 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); +DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type); + /* --------------------------schemaless INTERFACE------------------------------- */ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); @@ -263,7 +270,7 @@ DLL_EXPORT const char *tmq_err2str(int32_t code); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ typedef struct tmq_topic_assignment { - int32_t vgroupHandle; + int32_t vgId; int64_t currentOffset; int64_t begin; int64_t end; @@ -277,7 +284,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char* pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); -DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset); +DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgId, int64_t offset); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f07d31192d19711443dddd779ccbe33a903c46c7..f377ad0d63e140a8e5e4e7c46130b473d8bd684e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -106,6 +106,7 @@ enum { HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_TMQ, + HEARTBEAT_KEY_USER_PASSINFO, }; typedef enum _mgmt_table { @@ -634,6 +635,7 @@ typedef struct { int8_t connType; SEpSet epSet; int32_t svrTimestamp; + int32_t passVer; char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; } SConnectRsp; @@ -716,6 +718,14 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); +typedef struct SUserPassVersion { + char user[TSDB_USER_LEN]; + int32_t version; +} SUserPassVersion; + +typedef SGetUserAuthReq SGetUserPassReq; +typedef SUserPassVersion SGetUserPassRsp; + /* * for client side struct, only column id, type, bytes are necessary * But for data in vnode side, we need all the following information. @@ -1046,6 +1056,14 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp); +typedef struct { + SArray* pArray; // Array of SGetUserPassRsp +} SUserPassBatchRsp; + +int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp); +int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp); +void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp); + typedef struct { char db[TSDB_DB_FNAME_LEN]; STimeWindow timeRange; diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 07a0ca952a1dbebbe43af966c183d8ea0981fc1c..6c3c7497b1f5b123143e99283657f55a49f2eb63 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -20,6 +20,7 @@ #include "tmsg.h" #include "tmsgcb.h" #include "trpc.h" +#include "sync.h" #ifdef __cplusplus extern "C" { @@ -73,6 +74,7 @@ int32_t mndStart(SMnode *pMnode); void mndStop(SMnode *pMnode); int32_t mndIsCatchUp(SMnode *pMnode); +ESyncRole mndGetRole(SMnode *pMnode); /** * @brief Get mnode monitor info. diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 33cef538d2a7c02e19eac2143965a83e32bcdf27..e86a4f96905512dd08af6acc4b9563206b93f985 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -250,6 +250,7 @@ void syncPreStop(int64_t rid); void syncPostStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncIsCatchUp(int64_t rid); +ESyncRole syncGetRole(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0d9292cc6bef98fec88cf6989d9e9a0e202e3da4..b22ce30894e93bbda986c829c8567138c61d31ea 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -403,6 +403,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040F) #define TSDB_CODE_SNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411) +#define TSDB_CODE_MNODE_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0412) // internal +#define TSDB_CODE_MNODE_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0413) // internal // vnode // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x @@ -437,6 +439,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) #define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) +#define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal +#define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) @@ -712,15 +716,13 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901) #define TSDB_CODE_UDF_PIPE_READ_ERR TAOS_DEF_ERROR_CODE(0, 0x2902) #define TSDB_CODE_UDF_PIPE_CONNECT_ERR TAOS_DEF_ERROR_CODE(0, 0x2903) -#define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904) +#define TSDB_CODE_UDF_PIPE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2904) #define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) -#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) -#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907) -#define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908) -#define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909) -#define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A) -#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B) -#define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290C) +#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2906) +#define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2907) +#define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x2908) +#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2909) +#define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290A) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 41f87379a9c7766348c342b4f97273cb4704ab1a..8e20d7d2752d143d59766ac6eca7887e90f08333 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -63,6 +63,7 @@ typedef struct { // statistics int32_t reportCnt; int32_t connKeyCnt; + int32_t passKeyCnt; // with passVer call back int64_t reportBytes; // not implemented int64_t startTime; // ctl @@ -125,6 +126,12 @@ typedef struct SAppInfo { TdThreadMutex mutex; } SAppInfo; +typedef struct { + int32_t ver; + void* param; + __taos_notify_fn_t fp; +} SPassInfo; + typedef struct STscObj { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; @@ -140,6 +147,7 @@ typedef struct STscObj { int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; SHashObj* pRequests; + SPassInfo passInfo; } STscObj; typedef struct STscDbg { @@ -353,7 +361,7 @@ void stopAllRequests(SHashObj* pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); -void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* param); typedef struct SSqlCallbackWrapper { SParseContext* pParseCtx; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 418103f2a6f97cab51eea3d83955b1224b8428b7..99569fdb574bd2860e8163c9a1ba0e2d30eb5750 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -239,7 +239,7 @@ void destroyTscObj(void *pObj) { tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; - hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); + hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, pTscObj->passInfo.fp); destroyAllRequests(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c9c2e7a5f82453035100f2e4a67d806c70a7ff57..79435da89fb5f85fa0fdace11e573e7c85c716b9 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -19,6 +19,15 @@ #include "scheduler.h" #include "trpc.h" +typedef struct { + union { + struct { + int64_t clusterId; + int32_t passKeyCnt; + }; + }; +} SHbParam; + static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); @@ -49,6 +58,52 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC return TSDB_CODE_SUCCESS; } +static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) { + int32_t code = 0; + int32_t numOfBatchs = 0; + SUserPassBatchRsp batchRsp = {0}; + if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { + code = TSDB_CODE_INVALID_MSG; + return code; + } + + numOfBatchs = taosArrayGetSize(batchRsp.pArray); + + SClientHbReq *pReq = NULL; + while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid); + if (!pTscObj) { + continue; + } + SPassInfo *passInfo = &pTscObj->passInfo; + if (!passInfo->fp) { + releaseTscObj(pReq->connKey.tscRid); + continue; + } + + for (int32_t i = 0; i < numOfBatchs; ++i) { + SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); + if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { + int32_t oldVer = atomic_load_32(&passInfo->ver); + if (oldVer < rsp->version) { + atomic_store_32(&passInfo->ver, rsp->version); + if (passInfo->fp) { + (*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); + } + tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer, + atomic_load_32(&passInfo->ver), pTscObj->id); + } + break; + } + } + releaseTscObj(pReq->connKey.tscRid); + } + + taosArrayDestroy(batchRsp.pArray); + + return code; +} + static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { int32_t code = 0; SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo)); @@ -291,6 +346,15 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); break; } + case HEARTBEAT_KEY_USER_PASSINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr); + break; + } default: tscError("invalid hb key type:%d", kv->key); break; @@ -472,6 +536,49 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } +static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + if (!pTscObj) { + tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); + return TSDB_CODE_APP_ERROR; + } + + int32_t code = 0; + SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); + if (!user) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; + } + strncpy(user->user, pTscObj->user, TSDB_USER_LEN); + user->version = htonl(pTscObj->passInfo.ver); + + SKv kv = { + .key = HEARTBEAT_KEY_USER_PASSINFO, + .valueLen = sizeof(SUserPassVersion), + .value = user, + }; + + tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, + pTscObj->passInfo.ver, connKey->tscRid); + + if (!req->info) { + req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + } + + if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) { + code = terrno ? terrno : TSDB_CODE_APP_ERROR; + goto _return; + } + +_return: + releaseTscObj(connKey->tscRid); + if (code) { + tscError("hb got user basic info failed since %s", terrstr(code)); + } + + return code; +} + int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SUserAuthVersion *users = NULL; uint32_t userNum = 0; @@ -607,19 +714,23 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { } int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { - int64_t *clusterId = (int64_t *)param; + SHbParam *hbParam = (SHbParam *)param; struct SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(*clusterId, &pCatalog); + int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } - hbGetAppInfo(*clusterId, req); + hbGetAppInfo(hbParam->clusterId, req); hbGetQueryBasicInfo(connKey, req); + if (hbParam->passKeyCnt > 0) { + hbGetUserBasicInfo(connKey, req); + } + code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -673,7 +784,26 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { while (pIter != NULL) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq); + SHbParam param; + switch (pOneReq->connKey.connType) { + case CONN_TYPE__QUERY: { + param.clusterId = pOneReq->clusterId; + param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); + break; + } + default: + break; + } + if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) { + code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq); + if (code) { + tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code), + pOneReq->connKey.tscRid, pOneReq->connKey.connType); + } + } + break; + +#if 0 if (code) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; @@ -682,6 +812,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; +#endif } releaseTscObj(rid); @@ -1023,7 +1154,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } } -void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { +void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *param) { SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (pReq) { tFreeClientHbReq(pReq); @@ -1036,4 +1167,7 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); + if (param) { + atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1); + } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 60c7b44b3d078e4102c6a6a5920f4fce61f8afbc..aed11b4fb153983e02d05d2d2d59f5d4a0b7a94d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -119,6 +119,39 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha return NULL; } +int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) { + if (taos == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + STscObj *pObj = acquireTscObj(*(int64_t *)taos); + if (NULL == pObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + tscError("invalid parameter for %s", __func__); + return terrno; + } + + switch (type) { + case TAOS_NOTIFY_PASSVER: { + pObj->passInfo.fp = fp; + pObj->passInfo.param = param; + if (fp) { + atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1); + } + break; + } + default: { + terrno = TSDB_CODE_INVALID_PARA; + releaseTscObj(*(int64_t *)taos); + return terrno; + } + } + + releaseTscObj(*(int64_t *)taos); + return 0; +} + void taos_close_internal(void *taos) { if (taos == NULL) { return; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index a0146cfa39fee78fee7287526a3a1c7ea7a8c2da..52c5fc79408a05cf499d33059c6e7c07d176b615 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -130,6 +130,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { lastClusterId = connectRsp.clusterId; pTscObj->connType = connectRsp.connType; + pTscObj->passInfo.ver = connectRsp.passVer; hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 87c58ab32f4c8ef1dbb36ecfc8ead95bdce639fc..76fd1d84d0bec43ddea33172aecdcbdfa3ab68a3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2357,7 +2357,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_topic_assignment assignment = {.begin = pHead->walsver, .end = pHead->walever, .currentOffset = rsp.rspOffset.version, - .vgroupHandle = pParam->vgId}; + .vgId = pParam->vgId}; taosThreadMutexLock(&pCommon->mutex); taosArrayPush(pCommon->pList, &assignment); @@ -2422,7 +2422,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; - pAssignment->vgroupHandle = pClientVg->vgId; + pAssignment->vgId = pClientVg->vgId; } if (needFetch) { @@ -2524,7 +2524,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } } -int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) { +int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL) { tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; @@ -2544,14 +2544,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); for (int32_t i = 0; i < numOfVgs; ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgroupHandle) { + if (pClientVg->vgId == vgId) { pVg = pClientVg; break; } } if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle); + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); return TSDB_CODE_INVALID_PARA; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 08d62054e3b043f9bdbae3dcbd5f61f63758629d..f15a93cb2c300587bfffe880b8a45bf0aef60175 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1144,7 +1144,7 @@ TEST(clientCase, sub_tb_test) { taos_free_result(pRes); } - tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin); + tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, pAssign[0].begin); } tmq_consumer_close(tmq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 79c12df96bd58d2fdf9313aa76a6309b581421b4..4139d6c7d449ec4b5c529cd2b0b6814258a9e380 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2936,6 +2936,59 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) { taosArrayDestroy(pRsp->pArray); } +int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tEncodeI32(&encoder, numOfBatch) < 0) return -1; + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i); + if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1; + if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; + + pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp)); + if (pRsp->pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserPassRsp rsp = {0}; + if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1; + if (tDecodeI32(&decoder, &rsp.version) < 0) return -1; + taosArrayPush(pRsp->pArray, &rsp); + } + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) { + if(pRsp) { + taosArrayDestroy(pRsp->pArray); + } +} + int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3976,6 +4029,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->svrTimestamp) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3999,6 +4053,13 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1; + + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pRsp->passVer) < 0) return -1; + } else { + pRsp->passVer = 0; + } + tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 05b59b98652ec2a33335ebfc3d3be351c7075672..7840528db9329f7b5c10449bee0d8a213e87ebe6 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -159,6 +159,10 @@ static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { return mndIsCatchUp(pMgmt->pMnode); } +static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) { + return mndGetRole(pMgmt->pMnode); +} + SMgmtFunc mmGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; mgmtFunc.openFp = mmOpen; @@ -170,6 +174,7 @@ SMgmtFunc mmGetMgmtFunc() { mgmtFunc.requiredFp = mmRequire; mgmtFunc.getHandlesFp = mmGetMsgHandles; mgmtFunc.isCatchUpFp = (NodeIsCatchUpFp)mmSyncIsCatchUp; + mgmtFunc.nodeRoleFp = (NodeRole)mmSyncGetRole; return mgmtFunc; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 98029304183fa250546cdd4e5b89fc29b71083e1..9dbc12cf622af70028629cc152c9f60593c21004 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -336,13 +336,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr()); + dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); terrno = TSDB_CODE_VND_NOT_EXIST; return -1; } + ESyncRole role = vnodeGetRole(pVnode->pImpl); + dInfo("vgId:%d, checking node role:%d", req.vgId, role); + if(role == TAOS_SYNC_ROLE_VOTER){ + terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; + vmReleaseVnode(pMgmt, pVnode); + return -1; + } + dInfo("vgId:%d, checking node catch up", req.vgId); - if(vnodeIsCatchUp(pVnode->pImpl) != 0){ + if(vnodeIsCatchUp(pVnode->pImpl) != 1){ + terrno = TSDB_CODE_VND_NOT_CATCH_UP; + vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -365,6 +375,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); + vmReleaseVnode(pMgmt, pVnode); return -1; } @@ -381,6 +392,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn, pReplica->port); + vmReleaseVnode(pMgmt, pVnode); return -1; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index c53c21cd3096ec061e0e88b2b457ca62c4f302a1..3459af1a3a99693f6667728ffe4ab6f2b3923c91 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -214,9 +214,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { pWrapper = &pDnode->wrappers[ntype]; + if(pWrapper->func.nodeRoleFp != NULL){ + ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); + dInfo("node:%s, checking node role:%d", pWrapper->name, role); + if(role == TAOS_SYNC_ROLE_VOTER){ + terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; + return -1; + } + } + if(pWrapper->func.isCatchUpFp != NULL){ dInfo("node:%s, checking node catch up", pWrapper->name); - if(!(*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) == 0){ + if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ + terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 000ce8120797e7246c9a85dc80a2ccd0d40c7ad1..98ef8cd95ba9c201ec23ff9fd8515948016eec4f 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -135,6 +135,7 @@ typedef int32_t (*NodeDropFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle typedef bool (*NodeIsCatchUpFp)(void *pMgmt); +typedef bool (*NodeRole)(void *pMgmt); typedef struct { NodeOpenFp openFp; @@ -146,6 +147,7 @@ typedef struct { NodeRequireFp requiredFp; NodeGetHandlesFp getHandlesFp; NodeIsCatchUpFp isCatchUpFp; + NodeRole nodeRoleFp; } SMgmtFunc; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index cc295f0d24d185a21d876b76bc00d62682a045b9..f547ce025d67d5e04441699b2e6b48eb3efd52e2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -280,6 +280,7 @@ typedef struct { int8_t reserve; int32_t acctId; int32_t authVersion; + int32_t passVersion; SHashObj* readDbs; SHashObj* writeDbs; SHashObj* topics; diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index 95d15f6e5a608ebd218581d78f6f67f6b5e5fdc6..aa7f97f0870dfd83ba4c3296ba1293e091b6fdba 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -35,6 +35,8 @@ SHashObj *mndDupTableHash(SHashObj *pOld); SHashObj *mndDupTopicHash(SHashObj *pOld); int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen); +int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp, + int32_t *pRspLen); int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 44a7d49fff672e33af43bb892350ee0b5e02debd..a991bddda8a997c6074ee21e811f3829e0370fe8 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -421,6 +421,7 @@ void dumpUser(SSdb *pSdb, SJson *json) { tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime)); tjsonAddStringToObject(item, "superUser", i642str(pObj->superUser)); tjsonAddStringToObject(item, "authVersion", i642str(pObj->authVersion)); + tjsonAddStringToObject(item, "passVersion", i642str(pObj->passVersion)); tjsonAddStringToObject(item, "numOfReadDbs", i642str(taosHashGetSize(pObj->readDbs))); tjsonAddStringToObject(item, "numOfWriteDbs", i642str(taosHashGetSize(pObj->writeDbs))); sdbRelease(pSdb, pObj); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 13ae4a11d5c61938c15e26a12b09c4c9fd057231..a9f52128a685215fed934eaddf684cc77ea1ff30 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -590,6 +590,11 @@ int32_t mndIsCatchUp(SMnode *pMnode) { return syncIsCatchUp(rid); } +ESyncRole mndGetRole(SMnode *pMnode){ + int64_t rid = pMnode->syncMgmt.sync; + return syncGetRole(rid); +} + void mndStop(SMnode *pMnode) { mndSetStop(pMnode); mndSyncStop(pMnode); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 53baf843de7d0a83a26fa1129ada5329f7862238..5e3476859a446340212ed9d687f84a87dd16c3ea 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -322,7 +322,8 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, .pCont = pReq, .contLen = contLen, .msgType = TDMT_DND_ALTER_MNODE_TYPE, - .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED, + .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP, + .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER, }; if (mndTransAppendRedoAction(pTrans, &action) != 0) { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 50e502f4ab9b240e8518bdb4fe531929149e974d..2ebb5aeb9904cbabcc95086846b9ded2cdd2f1ef 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -283,6 +283,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { connectRsp.connType = connReq.connType; connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.svrTimestamp = taosGetTimestampSec(); + connectRsp.passVer = pUser->passVersion; strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, @@ -547,6 +548,16 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } break; } + case HEARTBEAT_KEY_USER_PASSINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } default: mError("invalid kv key:%d", kv->key); hbRsp.status = TSDB_CODE_APP_ERROR; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d08227927aae2d0d6fd2443df36650e8445a08f8..f6e5895cda2ee31b838f1203c861a04f3c49ac17 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -23,7 +23,7 @@ #include "mndTrans.h" #include "tbase64.h" -#define USER_VER_NUMBER 3 +#define USER_VER_NUMBER 4 #define USER_RESERVE_SIZE 64 static int32_t mndCreateDefaultUsers(SMnode *pMnode); @@ -174,6 +174,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_INT8(pRaw, dataPos, pUser->enable, _OVER) SDB_SET_INT8(pRaw, dataPos, pUser->reserve, _OVER) SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER) + SDB_SET_INT32(pRaw, dataPos, pUser->passVersion, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER) @@ -263,7 +264,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != 1 && sver != 2 && sver != 3) { + if (sver < 1 || sver > USER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -285,6 +286,9 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pUser->enable, _OVER) SDB_GET_INT8(pRaw, dataPos, &pUser->reserve, _OVER) SDB_GET_INT32(pRaw, dataPos, &pUser->authVersion, _OVER) + if (sver >= 4) { + SDB_GET_INT32(pRaw, dataPos, &pUser->passVersion, _OVER) + } int32_t numOfReadDbs = 0; int32_t numOfWriteDbs = 0; @@ -529,6 +533,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { taosWLockLatch(&pOld->lock); pOld->updateTime = pNew->updateTime; pOld->authVersion = pNew->authVersion; + pOld->passVersion = pNew->passVersion; pOld->sysInfo = pNew->sysInfo; pOld->enable = pNew->enable; memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); @@ -818,10 +823,14 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER; + newUser.passVersion = pUser->passVersion; if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN); + if (0 != strncmp(pUser->pass, pass, TSDB_PASSWORD_LEN)) { + ++newUser.passVersion; + } } if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { @@ -1420,6 +1429,69 @@ _OVER: return code; } +int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp, + int32_t *pRspLen) { + int32_t code = 0; + SUserPassBatchRsp batchRsp = {0}; + + for (int32_t i = 0; i < numOfUses; ++i) { + SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user); + if (pUser == NULL) { + mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr()); + continue; + } + + pUsers[i].version = ntohl(pUsers[i].version); + if (pUser->passVersion <= pUsers[i].version) { + mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion, + pUsers[i].version); + mndReleaseUser(pMnode, pUser); + continue; + } + + SGetUserPassRsp rsp = {0}; + memcpy(rsp.user, pUser->user, TSDB_USER_LEN); + rsp.version = pUser->passVersion; + + if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) { + code = TSDB_CODE_OUT_OF_MEMORY; + mndReleaseUser(pMnode, pUser); + goto _OVER; + } + + taosArrayPush(batchRsp.pArray, &rsp); + mndReleaseUser(pMnode, pUser); + } + + if (taosArrayGetSize(batchRsp.pArray) <= 0) { + goto _OVER; + } + + int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp); + if (rspLen < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + void *pRsp = taosMemoryMalloc(rspLen); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp); + + *ppRsp = pRsp; + *pRspLen = rspLen; + +_OVER: + if (code) { + *ppRsp = NULL; + *pRspLen = 0; + } + + tFreeSUserPassBatchRsp(&batchRsp); + return code; +} + int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index d674db5b4bdc2571104f14c0843440594e267d12..f0bece6e5eb8d77538e2d02eeb4787e2f3c55049 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1263,6 +1263,8 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_DND_ALTER_VNODE_TYPE; + action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER; + action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7dfaf1508df1c6c3d930789f4193d919a63c4cf7..828a173108f8c3a97529cf31be09f668d47b98aa 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,6 +69,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); +ESyncRole vnodeGetRole(SVnode *pVnode); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index deeb0af42a1433f73ad9be639dcb716dc4b7f4e2..7d41edfdd95913eb45dd7581e8f105252b949845 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -431,6 +431,10 @@ int32_t vnodeIsCatchUp(SVnode *pVnode){ return syncIsCatchUp(pVnode->sync); } +ESyncRole vnodeGetRole(SVnode *pVnode){ + return syncGetRole(pVnode->sync); +} + void vnodeStop(SVnode *pVnode) {} int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 8c8b99a6f804b2b7200e0641ca3d5bdb2660292c..6b70422ac8fe20bf8089eb8efba11ecc3c5e6ae5 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1580,7 +1580,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { case UV_TASK_REQ_RSP: { uv_pipe_t *pipe = uvTask->pipe; if (pipe == NULL) { - code = TSDB_CODE_UDF_PIPE_NO_PIPE; + code = TSDB_CODE_UDF_PIPE_NOT_EXIST; } else { uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); write->data = pipe->data; @@ -1598,7 +1598,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { case UV_TASK_DISCONNECT: { uv_pipe_t *pipe = uvTask->pipe; if (pipe == NULL) { - code = TSDB_CODE_UDF_PIPE_NO_PIPE; + code = TSDB_CODE_UDF_PIPE_NOT_EXIST; } else { SClientUvConn *conn = pipe->data; QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); @@ -1759,9 +1759,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { } int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { - if (gUdfcProxy.udfcState != UDFC_STATE_READY) { - return TSDB_CODE_UDF_INVALID_STATE; - } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); task->errCode = 0; task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); @@ -1804,7 +1801,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf SUdfcUvSession *session = (SUdfcUvSession *)handle; if (session->udfUvPipe == NULL) { fnError("No pipe to udfd"); - return TSDB_CODE_UDF_PIPE_NO_PIPE; + return TSDB_CODE_UDF_PIPE_NOT_EXIST; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); task->errCode = 0; @@ -1928,7 +1925,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { if (session->udfUvPipe == NULL) { fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName); taosMemoryFree(session); - return TSDB_CODE_UDF_PIPE_NO_PIPE; + return TSDB_CODE_UDF_PIPE_NOT_EXIST; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e3342d76ee8d986d92b5c88a47da7e5a518f80ae..cb1919d16ea1db667d841b58302363b17c6c05a0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -580,25 +580,37 @@ int32_t syncIsCatchUp(int64_t rid) { return -1; } - while(1){ - if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || - pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || - pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ - sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, - pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, - pSyncNode->pLogBuf->matchIndex); - taosSsleep(1); - } - else{ - sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, - pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, - pSyncNode->pLogBuf->matchIndex); - break; - } + int32_t isCatchUp = 0; + if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || + pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || + pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ + sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, + pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, + pSyncNode->pLogBuf->matchIndex); + isCatchUp = 0; + } + else{ + sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, + pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, + pSyncNode->pLogBuf->matchIndex); + isCatchUp = 1; } syncNodeRelease(pSyncNode); - return 0; + return isCatchUp; +} + +ESyncRole syncGetRole(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + sError("sync Node Acquire error since %d", errno); + return -1; + } + + ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole; + + syncNodeRelease(pSyncNode); + return role; } int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { @@ -2144,7 +2156,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid); if (pData == NULL) { - sError("hb timer get pData NULL, rid:%" PRId64 " addr:%" PRId64, hbDataRid, pData->destId.addr); + sError("hb timer get pData NULL, %" PRId64, hbDataRid); return; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 34b09761c816bfab88f53403ff328085ed0df7fb..dbdb40b93d39851c422e9fe2f4f1246d183ad25b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -581,11 +581,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_DUP_TIMESTAMP, "Duplicate timestamps TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_READ_ERR, "udf pipe read error") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect error") -TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NOT_EXIST, "udf pipe not exist") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") -TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input") -TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") diff --git a/tests/script/api/makefile b/tests/script/api/makefile index 6739794cc8e09025ec18ee92100453edce082f77..6d55d8a75f7efe4383c81781165b2246e7a893a0 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -15,8 +15,11 @@ exe: gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS) gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS) gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS) + gcc $(CFLAGS) ./passwdTest.c -o $(ROOT)passwdTest $(LFLAGS) clean: rm $(ROOT)batchprepare rm $(ROOT)stopquery rm $(ROOT)dbTableRoute + rm $(ROOT)insertSameTs + rm $(ROOT)passwdTest diff --git a/tests/script/api/passwdTest.c b/tests/script/api/passwdTest.c new file mode 100644 index 0000000000000000000000000000000000000000..8a2b0a0390dad3acef1fca93abc89095164bb401 --- /dev/null +++ b/tests/script/api/passwdTest.c @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o demo demo.c -ltaos + +/** + * passwdTest.c + * - Run the test case in clear TDengine environment with default root passwd 'taosdata' + */ + +#include +#include +#include +#include +#include +#include "taos.h" // TAOS header file + +#define nDup 1 +#define nRoot 10 +#define nUser 10 +#define USER_LEN 24 + +void Test(TAOS *taos, char *qstr); +void createUers(TAOS *taos, const char *host, char *qstr); +void passVerTestMulti(const char *host, char *qstr); + +int nPassVerNotified = 0; +TAOS *taosu[nRoot] = {0}; +char users[nUser][USER_LEN] = {0}; + +void __taos_notify_cb(void *param, void *ext, int type) { + switch (type) { + case TAOS_NOTIFY_PASSVER: { + ++nPassVerNotified; + printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext); + break; + } + default: + printf("%s:%d unknown type:%d\n", __func__, __LINE__, type); + break; + } +} + +static void queryDB(TAOS *taos, char *command) { + int i; + TAOS_RES *pSql = NULL; + int32_t code = -1; + + for (i = 0; i < nDup; ++i) { + if (NULL != pSql) { + taos_free_result(pSql); + pSql = NULL; + } + + pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (0 == code) { + break; + } + } + + if (code != 0) { + fprintf(stderr, "failed to run: %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); + exit(EXIT_FAILURE); + } else { + fprintf(stderr, "success to run: %s\n", command); + } + + taos_free_result(pSql); +} + +int main(int argc, char *argv[]) { + char qstr[1024]; + + // connect to server + if (argc < 2) { + printf("please input server-ip \n"); + return 0; + } + + TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); + exit(1); + } + createUers(taos, argv[1], qstr); + passVerTestMulti(argv[1], qstr); + + taos_close(taos); + taos_cleanup(); +} + +void createUers(TAOS *taos, const char *host, char *qstr) { + // users + for (int i = 0; i < nUser; ++i) { + sprintf(users[i], "user%d", i); + sprintf(qstr, "CREATE USER %s PASS 'taosdata'", users[i]); + queryDB(taos, qstr); + + taosu[i] = taos_connect(host, users[i], "taosdata", NULL, 0); + if (taosu[i] == NULL) { + printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/); + exit(1); + } + + int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_PASSVER); + + if (code != 0) { + fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code); + } else { + fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]); + } + + // alter pass for users + sprintf(qstr, "alter user %s pass 'taos'", users[i]); + queryDB(taos, qstr); + } +} + +void passVerTestMulti(const char *host, char *qstr) { + // root + TAOS *taos[nRoot] = {0}; + char userName[USER_LEN] = "root"; + + for (int i = 0; i < nRoot; ++i) { + taos[i] = taos_connect(host, "root", "taosdata", NULL, 0); + if (taos[i] == NULL) { + printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); + exit(1); + } + + int code = taos_set_notify_cb(taos[i], __taos_notify_cb, userName, TAOS_NOTIFY_PASSVER); + + if (code != 0) { + fprintf(stderr, "failed to run: taos_set_notify_cb since %d\n", code); + } else { + fprintf(stderr, "success to run: taos_set_notify_cb\n"); + } + } + + queryDB(taos[0], "create database if not exists demo1 vgroups 1 minrows 10"); + queryDB(taos[0], "create database if not exists demo2 vgroups 1 minrows 10"); + queryDB(taos[0], "create database if not exists demo3 vgroups 1 minrows 10"); + + queryDB(taos[0], "create table demo1.stb (ts timestamp, c1 int) tags(t1 int)"); + queryDB(taos[0], "create table demo2.stb (ts timestamp, c1 int) tags(t1 int)"); + queryDB(taos[0], "create table demo3.stb (ts timestamp, c1 int) tags(t1 int)"); + + strcpy(qstr, "alter user root pass 'taos'"); + queryDB(taos[0], qstr); + + // calculate the nPassVerNotified for root and users + int nConn = nRoot + nUser; + + for (int i = 0; i < 15; ++i) { + if (nPassVerNotified >= nConn) break; + sleep(1); + } + + // close the taos_conn + for (int i = 0; i < nRoot; ++i) { + taos_close(taos[i]); + printf("%s:%d close taos[%d]\n", __func__, __LINE__, i); + sleep(1); + } + + for (int i = 0; i < nUser; ++i) { + taos_close(taosu[i]); + printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i); + sleep(1); + } + + if (nPassVerNotified >= nConn) { + fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn); + } else { + fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn); + } + // sleep(300); +} \ No newline at end of file