diff --git a/include/client/taos.h b/include/client/taos.h index e46cd99c73c824a123cde51f3f4eea08ba380970..44703c2797d3e39bbc05f817a0cfd65acd6af2cf 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -101,6 +101,7 @@ typedef struct TAOS_FIELD_E { #endif typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *res, int code); +typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type); typedef struct TAOS_MULTI_BIND { int buffer_type; @@ -121,6 +122,10 @@ typedef enum { SET_CONF_RET_ERR_TOO_LONG = -6 } SET_CONF_RET_CODE; +typedef enum { + TAOS_NOTIFY_PASSVER = 0, +} TAOS_NOTIFY_TYPE; + #define RET_MSG_LENGTH 1024 typedef struct setConfRet { SET_CONF_RET_CODE retCode; @@ -225,6 +230,8 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); +DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type); + /* --------------------------schemaless INTERFACE------------------------------- */ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f07d31192d19711443dddd779ccbe33a903c46c7..f377ad0d63e140a8e5e4e7c46130b473d8bd684e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -106,6 +106,7 @@ enum { HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_TMQ, + HEARTBEAT_KEY_USER_PASSINFO, }; typedef enum _mgmt_table { @@ -634,6 +635,7 @@ typedef struct { int8_t connType; SEpSet epSet; int32_t svrTimestamp; + int32_t passVer; char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; } SConnectRsp; @@ -716,6 +718,14 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); +typedef struct SUserPassVersion { + char user[TSDB_USER_LEN]; + int32_t version; +} SUserPassVersion; + +typedef SGetUserAuthReq SGetUserPassReq; +typedef SUserPassVersion SGetUserPassRsp; + /* * for client side struct, only column id, type, bytes are necessary * But for data in vnode side, we need all the following information. @@ -1046,6 +1056,14 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp); +typedef struct { + SArray* pArray; // Array of SGetUserPassRsp +} SUserPassBatchRsp; + +int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp); +int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp); +void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp); + typedef struct { char db[TSDB_DB_FNAME_LEN]; STimeWindow timeRange; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 41f87379a9c7766348c342b4f97273cb4704ab1a..8e20d7d2752d143d59766ac6eca7887e90f08333 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -63,6 +63,7 @@ typedef struct { // statistics int32_t reportCnt; int32_t connKeyCnt; + int32_t passKeyCnt; // with passVer call back int64_t reportBytes; // not implemented int64_t startTime; // ctl @@ -125,6 +126,12 @@ typedef struct SAppInfo { TdThreadMutex mutex; } SAppInfo; +typedef struct { + int32_t ver; + void* param; + __taos_notify_fn_t fp; +} SPassInfo; + typedef struct STscObj { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; @@ -140,6 +147,7 @@ typedef struct STscObj { int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; SHashObj* pRequests; + SPassInfo passInfo; } STscObj; typedef struct STscDbg { @@ -353,7 +361,7 @@ void stopAllRequests(SHashObj* pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); -void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* param); typedef struct SSqlCallbackWrapper { SParseContext* pParseCtx; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 418103f2a6f97cab51eea3d83955b1224b8428b7..99569fdb574bd2860e8163c9a1ba0e2d30eb5750 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -239,7 +239,7 @@ void destroyTscObj(void *pObj) { tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; - hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); + hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, pTscObj->passInfo.fp); destroyAllRequests(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c9c2e7a5f82453035100f2e4a67d806c70a7ff57..79435da89fb5f85fa0fdace11e573e7c85c716b9 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -19,6 +19,15 @@ #include "scheduler.h" #include "trpc.h" +typedef struct { + union { + struct { + int64_t clusterId; + int32_t passKeyCnt; + }; + }; +} SHbParam; + static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); @@ -49,6 +58,52 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC return TSDB_CODE_SUCCESS; } +static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) { + int32_t code = 0; + int32_t numOfBatchs = 0; + SUserPassBatchRsp batchRsp = {0}; + if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { + code = TSDB_CODE_INVALID_MSG; + return code; + } + + numOfBatchs = taosArrayGetSize(batchRsp.pArray); + + SClientHbReq *pReq = NULL; + while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid); + if (!pTscObj) { + continue; + } + SPassInfo *passInfo = &pTscObj->passInfo; + if (!passInfo->fp) { + releaseTscObj(pReq->connKey.tscRid); + continue; + } + + for (int32_t i = 0; i < numOfBatchs; ++i) { + SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); + if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { + int32_t oldVer = atomic_load_32(&passInfo->ver); + if (oldVer < rsp->version) { + atomic_store_32(&passInfo->ver, rsp->version); + if (passInfo->fp) { + (*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); + } + tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer, + atomic_load_32(&passInfo->ver), pTscObj->id); + } + break; + } + } + releaseTscObj(pReq->connKey.tscRid); + } + + taosArrayDestroy(batchRsp.pArray); + + return code; +} + static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { int32_t code = 0; SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo)); @@ -291,6 +346,15 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); break; } + case HEARTBEAT_KEY_USER_PASSINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr); + break; + } default: tscError("invalid hb key type:%d", kv->key); break; @@ -472,6 +536,49 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } +static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + if (!pTscObj) { + tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); + return TSDB_CODE_APP_ERROR; + } + + int32_t code = 0; + SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); + if (!user) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; + } + strncpy(user->user, pTscObj->user, TSDB_USER_LEN); + user->version = htonl(pTscObj->passInfo.ver); + + SKv kv = { + .key = HEARTBEAT_KEY_USER_PASSINFO, + .valueLen = sizeof(SUserPassVersion), + .value = user, + }; + + tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, + pTscObj->passInfo.ver, connKey->tscRid); + + if (!req->info) { + req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + } + + if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) { + code = terrno ? terrno : TSDB_CODE_APP_ERROR; + goto _return; + } + +_return: + releaseTscObj(connKey->tscRid); + if (code) { + tscError("hb got user basic info failed since %s", terrstr(code)); + } + + return code; +} + int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SUserAuthVersion *users = NULL; uint32_t userNum = 0; @@ -607,19 +714,23 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { } int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { - int64_t *clusterId = (int64_t *)param; + SHbParam *hbParam = (SHbParam *)param; struct SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(*clusterId, &pCatalog); + int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } - hbGetAppInfo(*clusterId, req); + hbGetAppInfo(hbParam->clusterId, req); hbGetQueryBasicInfo(connKey, req); + if (hbParam->passKeyCnt > 0) { + hbGetUserBasicInfo(connKey, req); + } + code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -673,7 +784,26 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { while (pIter != NULL) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq); + SHbParam param; + switch (pOneReq->connKey.connType) { + case CONN_TYPE__QUERY: { + param.clusterId = pOneReq->clusterId; + param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); + break; + } + default: + break; + } + if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) { + code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq); + if (code) { + tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code), + pOneReq->connKey.tscRid, pOneReq->connKey.connType); + } + } + break; + +#if 0 if (code) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; @@ -682,6 +812,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; +#endif } releaseTscObj(rid); @@ -1023,7 +1154,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } } -void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { +void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *param) { SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (pReq) { tFreeClientHbReq(pReq); @@ -1036,4 +1167,7 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); + if (param) { + atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1); + } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 60c7b44b3d078e4102c6a6a5920f4fce61f8afbc..aed11b4fb153983e02d05d2d2d59f5d4a0b7a94d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -119,6 +119,39 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha return NULL; } +int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) { + if (taos == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + STscObj *pObj = acquireTscObj(*(int64_t *)taos); + if (NULL == pObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + tscError("invalid parameter for %s", __func__); + return terrno; + } + + switch (type) { + case TAOS_NOTIFY_PASSVER: { + pObj->passInfo.fp = fp; + pObj->passInfo.param = param; + if (fp) { + atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1); + } + break; + } + default: { + terrno = TSDB_CODE_INVALID_PARA; + releaseTscObj(*(int64_t *)taos); + return terrno; + } + } + + releaseTscObj(*(int64_t *)taos); + return 0; +} + void taos_close_internal(void *taos) { if (taos == NULL) { return; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index a0146cfa39fee78fee7287526a3a1c7ea7a8c2da..52c5fc79408a05cf499d33059c6e7c07d176b615 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -130,6 +130,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { lastClusterId = connectRsp.clusterId; pTscObj->connType = connectRsp.connType; + pTscObj->passInfo.ver = connectRsp.passVer; hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 79c12df96bd58d2fdf9313aa76a6309b581421b4..4139d6c7d449ec4b5c529cd2b0b6814258a9e380 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2936,6 +2936,59 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) { taosArrayDestroy(pRsp->pArray); } +int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tEncodeI32(&encoder, numOfBatch) < 0) return -1; + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i); + if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1; + if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; + + pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp)); + if (pRsp->pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserPassRsp rsp = {0}; + if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1; + if (tDecodeI32(&decoder, &rsp.version) < 0) return -1; + taosArrayPush(pRsp->pArray, &rsp); + } + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) { + if(pRsp) { + taosArrayDestroy(pRsp->pArray); + } +} + int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3976,6 +4029,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->svrTimestamp) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3999,6 +4053,13 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1; + + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pRsp->passVer) < 0) return -1; + } else { + pRsp->passVer = 0; + } + tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index cc295f0d24d185a21d876b76bc00d62682a045b9..f547ce025d67d5e04441699b2e6b48eb3efd52e2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -280,6 +280,7 @@ typedef struct { int8_t reserve; int32_t acctId; int32_t authVersion; + int32_t passVersion; SHashObj* readDbs; SHashObj* writeDbs; SHashObj* topics; diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index 95d15f6e5a608ebd218581d78f6f67f6b5e5fdc6..aa7f97f0870dfd83ba4c3296ba1293e091b6fdba 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -35,6 +35,8 @@ SHashObj *mndDupTableHash(SHashObj *pOld); SHashObj *mndDupTopicHash(SHashObj *pOld); int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen); +int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp, + int32_t *pRspLen); int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 44a7d49fff672e33af43bb892350ee0b5e02debd..a991bddda8a997c6074ee21e811f3829e0370fe8 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -421,6 +421,7 @@ void dumpUser(SSdb *pSdb, SJson *json) { tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime)); tjsonAddStringToObject(item, "superUser", i642str(pObj->superUser)); tjsonAddStringToObject(item, "authVersion", i642str(pObj->authVersion)); + tjsonAddStringToObject(item, "passVersion", i642str(pObj->passVersion)); tjsonAddStringToObject(item, "numOfReadDbs", i642str(taosHashGetSize(pObj->readDbs))); tjsonAddStringToObject(item, "numOfWriteDbs", i642str(taosHashGetSize(pObj->writeDbs))); sdbRelease(pSdb, pObj); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 50e502f4ab9b240e8518bdb4fe531929149e974d..2ebb5aeb9904cbabcc95086846b9ded2cdd2f1ef 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -283,6 +283,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { connectRsp.connType = connReq.connType; connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.svrTimestamp = taosGetTimestampSec(); + connectRsp.passVer = pUser->passVersion; strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, @@ -547,6 +548,16 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } break; } + case HEARTBEAT_KEY_USER_PASSINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } default: mError("invalid kv key:%d", kv->key); hbRsp.status = TSDB_CODE_APP_ERROR; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d08227927aae2d0d6fd2443df36650e8445a08f8..f6e5895cda2ee31b838f1203c861a04f3c49ac17 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -23,7 +23,7 @@ #include "mndTrans.h" #include "tbase64.h" -#define USER_VER_NUMBER 3 +#define USER_VER_NUMBER 4 #define USER_RESERVE_SIZE 64 static int32_t mndCreateDefaultUsers(SMnode *pMnode); @@ -174,6 +174,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_INT8(pRaw, dataPos, pUser->enable, _OVER) SDB_SET_INT8(pRaw, dataPos, pUser->reserve, _OVER) SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER) + SDB_SET_INT32(pRaw, dataPos, pUser->passVersion, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER) @@ -263,7 +264,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != 1 && sver != 2 && sver != 3) { + if (sver < 1 || sver > USER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -285,6 +286,9 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pUser->enable, _OVER) SDB_GET_INT8(pRaw, dataPos, &pUser->reserve, _OVER) SDB_GET_INT32(pRaw, dataPos, &pUser->authVersion, _OVER) + if (sver >= 4) { + SDB_GET_INT32(pRaw, dataPos, &pUser->passVersion, _OVER) + } int32_t numOfReadDbs = 0; int32_t numOfWriteDbs = 0; @@ -529,6 +533,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { taosWLockLatch(&pOld->lock); pOld->updateTime = pNew->updateTime; pOld->authVersion = pNew->authVersion; + pOld->passVersion = pNew->passVersion; pOld->sysInfo = pNew->sysInfo; pOld->enable = pNew->enable; memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); @@ -818,10 +823,14 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER; + newUser.passVersion = pUser->passVersion; if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN); + if (0 != strncmp(pUser->pass, pass, TSDB_PASSWORD_LEN)) { + ++newUser.passVersion; + } } if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { @@ -1420,6 +1429,69 @@ _OVER: return code; } +int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp, + int32_t *pRspLen) { + int32_t code = 0; + SUserPassBatchRsp batchRsp = {0}; + + for (int32_t i = 0; i < numOfUses; ++i) { + SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user); + if (pUser == NULL) { + mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr()); + continue; + } + + pUsers[i].version = ntohl(pUsers[i].version); + if (pUser->passVersion <= pUsers[i].version) { + mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion, + pUsers[i].version); + mndReleaseUser(pMnode, pUser); + continue; + } + + SGetUserPassRsp rsp = {0}; + memcpy(rsp.user, pUser->user, TSDB_USER_LEN); + rsp.version = pUser->passVersion; + + if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) { + code = TSDB_CODE_OUT_OF_MEMORY; + mndReleaseUser(pMnode, pUser); + goto _OVER; + } + + taosArrayPush(batchRsp.pArray, &rsp); + mndReleaseUser(pMnode, pUser); + } + + if (taosArrayGetSize(batchRsp.pArray) <= 0) { + goto _OVER; + } + + int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp); + if (rspLen < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + void *pRsp = taosMemoryMalloc(rspLen); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp); + + *ppRsp = pRsp; + *pRspLen = rspLen; + +_OVER: + if (code) { + *ppRsp = NULL; + *pRspLen = 0; + } + + tFreeSUserPassBatchRsp(&batchRsp); + return code; +} + int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; diff --git a/tests/script/api/makefile b/tests/script/api/makefile index 6739794cc8e09025ec18ee92100453edce082f77..6d55d8a75f7efe4383c81781165b2246e7a893a0 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -15,8 +15,11 @@ exe: gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS) gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS) gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS) + gcc $(CFLAGS) ./passwdTest.c -o $(ROOT)passwdTest $(LFLAGS) clean: rm $(ROOT)batchprepare rm $(ROOT)stopquery rm $(ROOT)dbTableRoute + rm $(ROOT)insertSameTs + rm $(ROOT)passwdTest diff --git a/tests/script/api/passwdTest.c b/tests/script/api/passwdTest.c new file mode 100644 index 0000000000000000000000000000000000000000..8a2b0a0390dad3acef1fca93abc89095164bb401 --- /dev/null +++ b/tests/script/api/passwdTest.c @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o demo demo.c -ltaos + +/** + * passwdTest.c + * - Run the test case in clear TDengine environment with default root passwd 'taosdata' + */ + +#include +#include +#include +#include +#include +#include "taos.h" // TAOS header file + +#define nDup 1 +#define nRoot 10 +#define nUser 10 +#define USER_LEN 24 + +void Test(TAOS *taos, char *qstr); +void createUers(TAOS *taos, const char *host, char *qstr); +void passVerTestMulti(const char *host, char *qstr); + +int nPassVerNotified = 0; +TAOS *taosu[nRoot] = {0}; +char users[nUser][USER_LEN] = {0}; + +void __taos_notify_cb(void *param, void *ext, int type) { + switch (type) { + case TAOS_NOTIFY_PASSVER: { + ++nPassVerNotified; + printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext); + break; + } + default: + printf("%s:%d unknown type:%d\n", __func__, __LINE__, type); + break; + } +} + +static void queryDB(TAOS *taos, char *command) { + int i; + TAOS_RES *pSql = NULL; + int32_t code = -1; + + for (i = 0; i < nDup; ++i) { + if (NULL != pSql) { + taos_free_result(pSql); + pSql = NULL; + } + + pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (0 == code) { + break; + } + } + + if (code != 0) { + fprintf(stderr, "failed to run: %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); + exit(EXIT_FAILURE); + } else { + fprintf(stderr, "success to run: %s\n", command); + } + + taos_free_result(pSql); +} + +int main(int argc, char *argv[]) { + char qstr[1024]; + + // connect to server + if (argc < 2) { + printf("please input server-ip \n"); + return 0; + } + + TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); + exit(1); + } + createUers(taos, argv[1], qstr); + passVerTestMulti(argv[1], qstr); + + taos_close(taos); + taos_cleanup(); +} + +void createUers(TAOS *taos, const char *host, char *qstr) { + // users + for (int i = 0; i < nUser; ++i) { + sprintf(users[i], "user%d", i); + sprintf(qstr, "CREATE USER %s PASS 'taosdata'", users[i]); + queryDB(taos, qstr); + + taosu[i] = taos_connect(host, users[i], "taosdata", NULL, 0); + if (taosu[i] == NULL) { + printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/); + exit(1); + } + + int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_PASSVER); + + if (code != 0) { + fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code); + } else { + fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]); + } + + // alter pass for users + sprintf(qstr, "alter user %s pass 'taos'", users[i]); + queryDB(taos, qstr); + } +} + +void passVerTestMulti(const char *host, char *qstr) { + // root + TAOS *taos[nRoot] = {0}; + char userName[USER_LEN] = "root"; + + for (int i = 0; i < nRoot; ++i) { + taos[i] = taos_connect(host, "root", "taosdata", NULL, 0); + if (taos[i] == NULL) { + printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); + exit(1); + } + + int code = taos_set_notify_cb(taos[i], __taos_notify_cb, userName, TAOS_NOTIFY_PASSVER); + + if (code != 0) { + fprintf(stderr, "failed to run: taos_set_notify_cb since %d\n", code); + } else { + fprintf(stderr, "success to run: taos_set_notify_cb\n"); + } + } + + queryDB(taos[0], "create database if not exists demo1 vgroups 1 minrows 10"); + queryDB(taos[0], "create database if not exists demo2 vgroups 1 minrows 10"); + queryDB(taos[0], "create database if not exists demo3 vgroups 1 minrows 10"); + + queryDB(taos[0], "create table demo1.stb (ts timestamp, c1 int) tags(t1 int)"); + queryDB(taos[0], "create table demo2.stb (ts timestamp, c1 int) tags(t1 int)"); + queryDB(taos[0], "create table demo3.stb (ts timestamp, c1 int) tags(t1 int)"); + + strcpy(qstr, "alter user root pass 'taos'"); + queryDB(taos[0], qstr); + + // calculate the nPassVerNotified for root and users + int nConn = nRoot + nUser; + + for (int i = 0; i < 15; ++i) { + if (nPassVerNotified >= nConn) break; + sleep(1); + } + + // close the taos_conn + for (int i = 0; i < nRoot; ++i) { + taos_close(taos[i]); + printf("%s:%d close taos[%d]\n", __func__, __LINE__, i); + sleep(1); + } + + for (int i = 0; i < nUser; ++i) { + taos_close(taosu[i]); + printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i); + sleep(1); + } + + if (nPassVerNotified >= nConn) { + fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn); + } else { + fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn); + } + // sleep(300); +} \ No newline at end of file