提交 76386b9f 编写于 作者: K kailixu

enh: assign passVer during taos_connect

上级 8c70dca4
...@@ -101,7 +101,7 @@ typedef struct TAOS_FIELD_E { ...@@ -101,7 +101,7 @@ typedef struct TAOS_FIELD_E {
#endif #endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *res, int code); typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *res, int code);
typedef void (*__taos_notify_fn_t)(void *param); typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);
typedef struct TAOS_MULTI_BIND { typedef struct TAOS_MULTI_BIND {
int buffer_type; int buffer_type;
...@@ -122,6 +122,10 @@ typedef enum { ...@@ -122,6 +122,10 @@ typedef enum {
SET_CONF_RET_ERR_TOO_LONG = -6 SET_CONF_RET_ERR_TOO_LONG = -6
} SET_CONF_RET_CODE; } SET_CONF_RET_CODE;
typedef enum {
TAOS_NOTIFY_PASSVER = 1,
} TAOS_NOTIFY_TYPE;
#define RET_MSG_LENGTH 1024 #define RET_MSG_LENGTH 1024
typedef struct setConfRet { typedef struct setConfRet {
SET_CONF_RET_CODE retCode; SET_CONF_RET_CODE retCode;
......
...@@ -629,6 +629,7 @@ typedef struct { ...@@ -629,6 +629,7 @@ typedef struct {
int8_t connType; int8_t connType;
SEpSet epSet; SEpSet epSet;
int32_t svrTimestamp; int32_t svrTimestamp;
int32_t passVer;
char sVer[TSDB_VERSION_LEN]; char sVer[TSDB_VERSION_LEN];
char sDetailVer[128]; char sDetailVer[128];
} SConnectRsp; } SConnectRsp;
......
...@@ -54,15 +54,14 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb ...@@ -54,15 +54,14 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (NULL == pTscObj) { if (NULL == pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_SUCCESS; return code;
} }
SUserPassBatchRsp batchRsp = {0}; SUserPassBatchRsp batchRsp = {0};
if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
assert(0); return code;
return -1;
} }
SPassInfo *passInfo = &pTscObj->passInfo; SPassInfo *passInfo = &pTscObj->passInfo;
...@@ -70,11 +69,11 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb ...@@ -70,11 +69,11 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb
for (int32_t i = 0; i < numOfBatchs; ++i) { for (int32_t i = 0; i < numOfBatchs; ++i) {
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
tscError("update user:%s passVer from %d to %d", rsp->user, passInfo->ver, rsp->version); tscDebug("update passVer of user %s from %d to %d", rsp->user, passInfo->ver, rsp->version);
if (atomic_load_32(&passInfo->ver) < rsp->version) { if (atomic_load_32(&passInfo->ver) < rsp->version) {
atomic_store_32(&passInfo->ver, rsp->version); atomic_store_32(&passInfo->ver, rsp->version);
if (passInfo->fp) { if (passInfo->fp) {
(*passInfo->fp)(NULL); (*passInfo->fp)(&pTscObj->id, NULL, TAOS_NOTIFY_PASSVER);
} }
} }
} }
...@@ -82,7 +81,7 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb ...@@ -82,7 +81,7 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb
taosArrayDestroy(batchRsp.pArray); taosArrayDestroy(batchRsp.pArray);
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
return TSDB_CODE_SUCCESS; return code;
} }
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
......
...@@ -130,6 +130,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -130,6 +130,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
lastClusterId = connectRsp.clusterId; lastClusterId = connectRsp.clusterId;
pTscObj->connType = connectRsp.connType; pTscObj->connType = connectRsp.connType;
pTscObj->passInfo.ver = connectRsp.passVer;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
......
...@@ -3874,6 +3874,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -3874,6 +3874,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeI32(&encoder, pRsp->svrTimestamp) < 0) return -1; if (tEncodeI32(&encoder, pRsp->svrTimestamp) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -3897,6 +3898,13 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -3897,6 +3898,13 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pRsp->passVer) < 0) return -1;
} else {
pRsp->passVer = 0;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
......
...@@ -283,6 +283,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -283,6 +283,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
connectRsp.connType = connReq.connType; connectRsp.connType = connReq.connType;
connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion;
strcpy(connectRsp.sVer, version); strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
......
...@@ -1042,7 +1042,7 @@ int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_ ...@@ -1042,7 +1042,7 @@ int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_
pUsers[i].version = ntohl(pUsers[i].version); pUsers[i].version = ntohl(pUsers[i].version);
if (pUser->passVersion <= pUsers[i].version) { if (pUser->passVersion <= pUsers[i].version) {
mDebug("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion, mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion,
pUsers[i].version); pUsers[i].version);
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
continue; continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册