提交 0256b44b 编写于 作者: K kailixu

chore: code optimization

上级 3b3732c3
...@@ -63,6 +63,7 @@ typedef struct { ...@@ -63,6 +63,7 @@ typedef struct {
// statistics // statistics
int32_t reportCnt; int32_t reportCnt;
int32_t connKeyCnt; int32_t connKeyCnt;
int32_t nPassVerCb;
int64_t reportBytes; // not implemented int64_t reportBytes; // not implemented
int64_t startTime; int64_t startTime;
// ctl // ctl
...@@ -73,7 +74,7 @@ typedef struct { ...@@ -73,7 +74,7 @@ typedef struct {
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req, int32_t cb);
typedef struct { typedef struct {
int8_t inited; int8_t inited;
...@@ -360,7 +361,7 @@ void stopAllRequests(SHashObj* pRequests); ...@@ -360,7 +361,7 @@ void stopAllRequests(SHashObj* pRequests);
// conn level // conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* cb);
typedef struct SSqlCallbackWrapper { typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx; SParseContext* pParseCtx;
......
...@@ -239,7 +239,7 @@ void destroyTscObj(void *pObj) { ...@@ -239,7 +239,7 @@ void destroyTscObj(void *pObj) {
tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, pTscObj->passInfo.fp);
destroyAllRequests(pTscObj->pRequests); destroyAllRequests(pTscObj->pRequests);
taosHashCleanup(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests);
......
...@@ -24,7 +24,7 @@ static SClientHbMgr clientHbMgr = {0}; ...@@ -24,7 +24,7 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req, int32_t cb) { return 0; }
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
...@@ -49,38 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC ...@@ -49,38 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey) { static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) {
int32_t code = 0; int32_t code = 0;
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); int32_t numOfBatchs = 0;
if (NULL == pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return code;
}
SUserPassBatchRsp batchRsp = {0}; SUserPassBatchRsp batchRsp = {0};
if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
releaseTscObj(connKey->tscRid);
return code; return code;
} }
SPassInfo *passInfo = &pTscObj->passInfo; numOfBatchs = taosArrayGetSize(batchRsp.pArray);
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) { SClientHbReq *pReq = NULL;
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
tscDebug("update passVer of user %s from %d to %d", rsp->user, passInfo->ver, rsp->version); if (!pTscObj) {
if (atomic_load_32(&passInfo->ver) < rsp->version) { continue;
atomic_store_32(&passInfo->ver, rsp->version); }
if (passInfo->fp) { SPassInfo *passInfo = &pTscObj->passInfo;
(*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); if (!passInfo->fp) {
continue;
}
for (int32_t i = 0; i < numOfBatchs; ++i) {
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
int32_t oldVer = atomic_load_32(&passInfo->ver);
if (oldVer < rsp->version) {
atomic_store_32(&passInfo->ver, rsp->version);
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer,
atomic_load_32(&passInfo->ver), pTscObj->id);
if (passInfo->fp) {
(*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER);
}
} }
break;
} }
} }
releaseTscObj(pReq->connKey.tscRid);
} }
taosArrayDestroy(batchRsp.pArray); taosArrayDestroy(batchRsp.pArray);
releaseTscObj(connKey->tscRid);
return code; return code;
} }
...@@ -91,7 +101,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { ...@@ -91,7 +101,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
vgInfo->vgVersion = rsp->vgVersion; vgInfo->vgVersion = rsp->vgVersion;
vgInfo->stateTs = rsp->stateTs; vgInfo->stateTs = rsp->stateTs;
vgInfo->hashMethod = rsp->hashMethod; vgInfo->hashMethod = rsp->hashMethod;
...@@ -104,7 +114,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { ...@@ -104,7 +114,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
for (int32_t j = 0; j < rsp->vgNum; ++j) { for (int32_t j = 0; j < rsp->vgNum; ++j) {
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
...@@ -332,7 +342,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { ...@@ -332,7 +342,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break; break;
} }
hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey); hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr);
break; break;
} }
default: default:
...@@ -523,12 +533,7 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -523,12 +533,7 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
int32_t code = 0; int32_t code = 0;
if (!pTscObj->passInfo.fp) {
goto _return;
}
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) { if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -543,7 +548,8 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -543,7 +548,8 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
.value = user, .value = user,
}; };
tscDebug("hb got user basic info, valueLen:%d", kv.valueLen); tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
pTscObj->passInfo.ver, connKey->tscRid);
if (!req->info) { if (!req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
...@@ -697,7 +703,7 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { ...@@ -697,7 +703,7 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req, int32_t cb) {
int64_t *clusterId = (int64_t *)param; int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
...@@ -711,7 +717,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -711,7 +717,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req); hbGetQueryBasicInfo(connKey, req);
hbGetUserBasicInfo(connKey, req); if (cb) hbGetUserBasicInfo(connKey, req);
code = hbGetExpiredUserInfo(connKey, pCatalog, req); code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -764,9 +770,14 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -764,9 +770,14 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return NULL; return NULL;
} }
int32_t nPassVerCb = atomic_load_32(&pAppHbMgr->nPassVerCb);
while (pIter != NULL) { while (pIter != NULL) {
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq); code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq,
nPassVerCb);
break;
#if 0
if (code) { if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter; pOneReq = pIter;
...@@ -775,6 +786,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -775,6 +786,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter; pOneReq = pIter;
#endif
} }
releaseTscObj(rid); releaseTscObj(rid);
...@@ -1116,7 +1128,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in ...@@ -1116,7 +1128,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
} }
} }
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *cb) {
SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (pReq) { if (pReq) {
tFreeClientHbReq(pReq); tFreeClientHbReq(pReq);
...@@ -1129,4 +1141,7 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { ...@@ -1129,4 +1141,7 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
} }
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
if (cb) {
atomic_sub_fetch_32(&pAppHbMgr->nPassVerCb, 1);
}
} }
...@@ -136,6 +136,9 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) ...@@ -136,6 +136,9 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
case TAOS_NOTIFY_PASSVER: { case TAOS_NOTIFY_PASSVER: {
pObj->passInfo.fp = fp; pObj->passInfo.fp = fp;
pObj->passInfo.param = param; pObj->passInfo.param = param;
if (fp) {
atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->nPassVerCb, 1);
}
break; break;
} }
default: { default: {
......
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
// TAOS standard API example. The same syntax as MySQL, but only a subset // TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos // to compile: gcc -o demo demo.c -ltaos
/**
* passwdTest.c
* - Run the test case in clear TDengine environment with default root passwd 'taosdata'
*/
#include <inttypes.h> #include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -23,18 +28,23 @@ ...@@ -23,18 +28,23 @@
#include <unistd.h> #include <unistd.h>
#include "taos.h" // TAOS header file #include "taos.h" // TAOS header file
#define nRepetition 1 #define nDup 1
#define nTaos 10 #define nRoot 10
#define nUser 10
#define USER_LEN 30
void Test(TAOS *taos, char *qstr); void Test(TAOS *taos, char *qstr);
void createUers(TAOS *taos, const char *host, char *qstr);
void passVerTestMulti(const char *host, char *qstr); void passVerTestMulti(const char *host, char *qstr);
int nPassVerNotifiedMulti = 0; int nPassVerNotified = 0;
TAOS *taosu[nRoot] = {0};
char users[nUser][USER_LEN] = {0};
void __taos_notify_cb(void *param, void *ext, int type) { void __taos_notify_cb(void *param, void *ext, int type) {
switch (type) { switch (type) {
case TAOS_NOTIFY_PASSVER: { case TAOS_NOTIFY_PASSVER: {
++nPassVerNotifiedMulti; ++nPassVerNotified;
printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext); printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext);
break; break;
} }
...@@ -49,7 +59,7 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -49,7 +59,7 @@ static void queryDB(TAOS *taos, char *command) {
TAOS_RES *pSql = NULL; TAOS_RES *pSql = NULL;
int32_t code = -1; int32_t code = -1;
for (i = 0; i < nRepetition; ++i) { for (i = 0; i < nDup; ++i) {
if (NULL != pSql) { if (NULL != pSql) {
taos_free_result(pSql); taos_free_result(pSql);
pSql = NULL; pSql = NULL;
...@@ -88,19 +98,46 @@ int main(int argc, char *argv[]) { ...@@ -88,19 +98,46 @@ int main(int argc, char *argv[]) {
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1); exit(1);
} }
createUers(taos, argv[1], qstr);
passVerTestMulti(argv[1], qstr); passVerTestMulti(argv[1], qstr);
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
} }
void createUers(TAOS *taos, const char *host, char *qstr) {
// users
for (int i = 0; i < nUser; ++i) {
sprintf(users[i], "user%d", i);
sprintf(qstr, "CREATE USER %s PASS 'taosdata'", users[i]);
queryDB(taos, qstr);
taosu[i] = taos_connect(host, users[i], "taosdata", NULL, 0);
if (taosu[i] == NULL) {
printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/);
exit(1);
}
int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_PASSVER);
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]);
}
// alter pass for users
sprintf(qstr, "alter user %s pass 'taos'", users[i]);
queryDB(taos, qstr);
}
}
void passVerTestMulti(const char *host, char *qstr) { void passVerTestMulti(const char *host, char *qstr) {
TAOS *taos[nTaos] = {0}; // root
char *userName = calloc(1, 24); TAOS *taos[nRoot] = {0};
strcpy(userName, "root"); char userName[USER_LEN] = "root";
for (int i = 0; i < nTaos; ++i) { for (int i = 0; i < nRoot; ++i) {
taos[i] = taos_connect(host, "root", "taosdata", NULL, 0); taos[i] = taos_connect(host, "root", "taosdata", NULL, 0);
if (taos[i] == NULL) { if (taos[i] == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
...@@ -127,18 +164,31 @@ void passVerTestMulti(const char *host, char *qstr) { ...@@ -127,18 +164,31 @@ void passVerTestMulti(const char *host, char *qstr) {
strcpy(qstr, "alter user root pass 'taos'"); strcpy(qstr, "alter user root pass 'taos'");
queryDB(taos[0], qstr); queryDB(taos[0], qstr);
for (int i = 0; i < 10; ++i) { // calculate the nPassVerNotified for root and users
if (nPassVerNotifiedMulti >= nTaos) break; int nConn = nRoot + nUser;
for (int i = 0; i < 15; ++i) {
if (nPassVerNotified >= nConn) break;
sleep(1); sleep(1);
} }
if (nPassVerNotifiedMulti >= nTaos) { // close the taos_conn
fprintf(stderr, "success to get passVer notification\n"); for (int i = 0; i < nRoot; ++i) {
} else { taos_close(taos[i]);
fprintf(stderr, "failed to get passVer notification\n"); printf("%s:%d close taos[%d]\n", __func__, __LINE__, i);
sleep(1);
} }
// sleep(1000); for (int i = 0; i < nUser; ++i) {
taos_close(taosu[i]);
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
sleep(1);
}
free(userName); if (nPassVerNotified >= nConn) {
fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn);
} else {
fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn);
}
// sleep(300);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册