diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f131bb9aa14bc0393eae3055128dcc3254f14ae0..7cd5354240410bf498e4b912c1357f187c6ad779 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -63,6 +63,7 @@ typedef struct { // statistics int32_t reportCnt; int32_t connKeyCnt; + int32_t nPassVerCb; int64_t reportBytes; // not implemented int64_t startTime; // ctl @@ -73,7 +74,7 @@ typedef struct { typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); -typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); +typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req, int32_t cb); typedef struct { int8_t inited; @@ -360,7 +361,7 @@ void stopAllRequests(SHashObj* pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); -void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* cb); typedef struct SSqlCallbackWrapper { SParseContext* pParseCtx; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 418103f2a6f97cab51eea3d83955b1224b8428b7..99569fdb574bd2860e8163c9a1ba0e2d30eb5750 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -239,7 +239,7 @@ void destroyTscObj(void *pObj) { tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; - hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); + hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, pTscObj->passInfo.fp); destroyAllRequests(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 7f6356284ad3083ba740e6120c93461e5ae40389..49fa2d6a7dc126a2b87ec93fd9967da98cfbfd83 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -24,7 +24,7 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } +static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req, int32_t cb) { return 0; } static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } @@ -49,38 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC return TSDB_CODE_SUCCESS; } -static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey) { - int32_t code = 0; - STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); - if (NULL == pTscObj) { - tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); - return code; - } - +static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) { + int32_t code = 0; + int32_t numOfBatchs = 0; SUserPassBatchRsp batchRsp = {0}; if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { code = TSDB_CODE_INVALID_MSG; - releaseTscObj(connKey->tscRid); return code; } - SPassInfo *passInfo = &pTscObj->passInfo; - int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); - for (int32_t i = 0; i < numOfBatchs; ++i) { - SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); - if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { - tscDebug("update passVer of user %s from %d to %d", rsp->user, passInfo->ver, rsp->version); - if (atomic_load_32(&passInfo->ver) < rsp->version) { - atomic_store_32(&passInfo->ver, rsp->version); - if (passInfo->fp) { - (*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); + numOfBatchs = taosArrayGetSize(batchRsp.pArray); + + SClientHbReq *pReq = NULL; + while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid); + if (!pTscObj) { + continue; + } + SPassInfo *passInfo = &pTscObj->passInfo; + if (!passInfo->fp) { + continue; + } + + for (int32_t i = 0; i < numOfBatchs; ++i) { + SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); + if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { + int32_t oldVer = atomic_load_32(&passInfo->ver); + if (oldVer < rsp->version) { + atomic_store_32(&passInfo->ver, rsp->version); + tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer, + atomic_load_32(&passInfo->ver), pTscObj->id); + if (passInfo->fp) { + (*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); + } } + break; } } + releaseTscObj(pReq->connKey.tscRid); } taosArrayDestroy(batchRsp.pArray); - releaseTscObj(connKey->tscRid); + return code; } @@ -91,7 +101,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } - + vgInfo->vgVersion = rsp->vgVersion; vgInfo->stateTs = rsp->stateTs; vgInfo->hashMethod = rsp->hashMethod; @@ -104,7 +114,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { code = TSDB_CODE_OUT_OF_MEMORY; goto _return; } - + for (int32_t j = 0; j < rsp->vgNum; ++j) { SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { @@ -332,7 +342,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { break; } - hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey); + hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr); break; } default: @@ -523,12 +533,7 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_APP_ERROR; } - int32_t code = 0; - - if (!pTscObj->passInfo.fp) { - goto _return; - } - + int32_t code = 0; SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); if (!user) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -543,7 +548,8 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { .value = user, }; - tscDebug("hb got user basic info, valueLen:%d", kv.valueLen); + tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, + pTscObj->passInfo.ver, connKey->tscRid); if (!req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); @@ -697,7 +703,7 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } -int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req, int32_t cb) { int64_t *clusterId = (int64_t *)param; struct SCatalog *pCatalog = NULL; @@ -711,7 +717,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); - hbGetUserBasicInfo(connKey, req); + if (cb) hbGetUserBasicInfo(connKey, req); code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { @@ -764,9 +770,14 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return NULL; } + int32_t nPassVerCb = atomic_load_32(&pAppHbMgr->nPassVerCb); while (pIter != NULL) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq); + code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq, + nPassVerCb); + break; + +#if 0 if (code) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; @@ -775,6 +786,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; +#endif } releaseTscObj(rid); @@ -1116,7 +1128,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } } -void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { +void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *cb) { SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (pReq) { tFreeClientHbReq(pReq); @@ -1129,4 +1141,7 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); + if (cb) { + atomic_sub_fetch_32(&pAppHbMgr->nPassVerCb, 1); + } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index b3f7c844e9199c3ee083f066b55cf912dfc24417..3f29249e52354fd53d681d631b23644fd96eb5d1 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -136,6 +136,9 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) case TAOS_NOTIFY_PASSVER: { pObj->passInfo.fp = fp; pObj->passInfo.param = param; + if (fp) { + atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->nPassVerCb, 1); + } break; } default: { diff --git a/tests/script/api/passwdTest.c b/tests/script/api/passwdTest.c index f3161047e88ac286bc6cce9d5660b573f5d217e9..c7888510447ddf01bc3e73016846a4fd10879232 100644 --- a/tests/script/api/passwdTest.c +++ b/tests/script/api/passwdTest.c @@ -16,6 +16,11 @@ // TAOS standard API example. The same syntax as MySQL, but only a subset // to compile: gcc -o demo demo.c -ltaos +/** + * passwdTest.c + * - Run the test case in clear TDengine environment with default root passwd 'taosdata' + */ + #include #include #include @@ -23,18 +28,23 @@ #include #include "taos.h" // TAOS header file -#define nRepetition 1 -#define nTaos 10 +#define nDup 1 +#define nRoot 10 +#define nUser 10 +#define USER_LEN 30 void Test(TAOS *taos, char *qstr); +void createUers(TAOS *taos, const char *host, char *qstr); void passVerTestMulti(const char *host, char *qstr); -int nPassVerNotifiedMulti = 0; +int nPassVerNotified = 0; +TAOS *taosu[nRoot] = {0}; +char users[nUser][USER_LEN] = {0}; void __taos_notify_cb(void *param, void *ext, int type) { switch (type) { case TAOS_NOTIFY_PASSVER: { - ++nPassVerNotifiedMulti; + ++nPassVerNotified; printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext); break; } @@ -49,7 +59,7 @@ static void queryDB(TAOS *taos, char *command) { TAOS_RES *pSql = NULL; int32_t code = -1; - for (i = 0; i < nRepetition; ++i) { + for (i = 0; i < nDup; ++i) { if (NULL != pSql) { taos_free_result(pSql); pSql = NULL; @@ -88,19 +98,46 @@ int main(int argc, char *argv[]) { printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); exit(1); } - + createUers(taos, argv[1], qstr); passVerTestMulti(argv[1], qstr); taos_close(taos); taos_cleanup(); } +void createUers(TAOS *taos, const char *host, char *qstr) { + // users + for (int i = 0; i < nUser; ++i) { + sprintf(users[i], "user%d", i); + sprintf(qstr, "CREATE USER %s PASS 'taosdata'", users[i]); + queryDB(taos, qstr); + + taosu[i] = taos_connect(host, users[i], "taosdata", NULL, 0); + if (taosu[i] == NULL) { + printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/); + exit(1); + } + + int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_PASSVER); + + if (code != 0) { + fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code); + } else { + fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]); + } + + // alter pass for users + sprintf(qstr, "alter user %s pass 'taos'", users[i]); + queryDB(taos, qstr); + } +} + void passVerTestMulti(const char *host, char *qstr) { - TAOS *taos[nTaos] = {0}; - char *userName = calloc(1, 24); - strcpy(userName, "root"); + // root + TAOS *taos[nRoot] = {0}; + char userName[USER_LEN] = "root"; - for (int i = 0; i < nTaos; ++i) { + for (int i = 0; i < nRoot; ++i) { taos[i] = taos_connect(host, "root", "taosdata", NULL, 0); if (taos[i] == NULL) { printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); @@ -127,18 +164,31 @@ void passVerTestMulti(const char *host, char *qstr) { strcpy(qstr, "alter user root pass 'taos'"); queryDB(taos[0], qstr); - for (int i = 0; i < 10; ++i) { - if (nPassVerNotifiedMulti >= nTaos) break; + // calculate the nPassVerNotified for root and users + int nConn = nRoot + nUser; + + for (int i = 0; i < 15; ++i) { + if (nPassVerNotified >= nConn) break; sleep(1); } - if (nPassVerNotifiedMulti >= nTaos) { - fprintf(stderr, "success to get passVer notification\n"); - } else { - fprintf(stderr, "failed to get passVer notification\n"); + // close the taos_conn + for (int i = 0; i < nRoot; ++i) { + taos_close(taos[i]); + printf("%s:%d close taos[%d]\n", __func__, __LINE__, i); + sleep(1); } - // sleep(1000); + for (int i = 0; i < nUser; ++i) { + taos_close(taosu[i]); + printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i); + sleep(1); + } - free(userName); + if (nPassVerNotified >= nConn) { + fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn); + } else { + fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn); + } + // sleep(300); } \ No newline at end of file