提交 ec8cd0c6 编写于 作者: K kailixu

enh: support passwd version

上级 8d0b239b
......@@ -101,6 +101,7 @@ typedef struct TAOS_FIELD_E {
#endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *res, int code);
typedef void (*__taos_notify_fn_t)(void *param);
typedef struct TAOS_MULTI_BIND {
int buffer_type;
......@@ -225,6 +226,8 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t *fp, void *param);
/* --------------------------schemaless INTERFACE------------------------------- */
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
......
......@@ -106,6 +106,7 @@ enum {
HEARTBEAT_KEY_DBINFO,
HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_TMQ,
HEARTBEAT_KEY_USER_PASSINFO,
};
typedef enum _mgmt_table {
......@@ -704,6 +705,13 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef SGetUserAuthReq SGetUserPassReq;
typedef struct {
char user[TSDB_USER_LEN];
int32_t version;
} SGetUserPassRsp;
/*
* for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information.
......@@ -1034,6 +1042,14 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp
int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp);
void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SGetUserPassRsp
} SUserPassBatchRsp;
int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
STimeWindow timeRange;
......
......@@ -140,6 +140,11 @@ typedef struct SUserAuthVersion {
int32_t version;
} SUserAuthVersion;
typedef struct SUserPassVersion {
char user[TSDB_USER_LEN];
int32_t version;
} SUserPassVersion;
typedef SDbCfgRsp SDbCfgInfo;
typedef SUserIndexRsp SIndexInfo;
......@@ -320,6 +325,8 @@ int32_t catalogChkAuthFromCache(SCatalog* pCtg, const char* user, const char* db
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
// int32_t catalogUpdateUserPassInfo(SCatalog* pCtg, SGetUserPassRsp* pPass);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet* epSet);
int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo* pConn, char** pVersion);
......
......@@ -133,6 +133,11 @@ typedef struct SAppInfo {
TdThreadMutex mutex;
} SAppInfo;
typedef struct {
int32_t ver;
__taos_notify_fn_t* fp;
} SPassInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
......@@ -148,6 +153,7 @@ typedef struct STscObj {
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
SPassInfo passInfo;
} STscObj;
typedef struct STscDbg {
......
......@@ -49,6 +49,42 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey) {
int32_t code = 0;
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (NULL == pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_SUCCESS;
}
SUserPassBatchRsp batchRsp = {0};
if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
releaseTscObj(connKey->tscRid);
assert(0);
return -1;
}
SPassInfo *passInfo = &pTscObj->passInfo;
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
tscError("update user:%s passVer from %d to %d", rsp->user, passInfo->ver, rsp->version);
if (atomic_load_32(&passInfo->ver) < rsp->version) {
atomic_store_32(&passInfo->ver, rsp->version);
if (passInfo->fp) {
(*passInfo->fp)(NULL);
}
}
}
}
taosArrayDestroy(batchRsp.pArray);
releaseTscObj(connKey->tscRid);
return TSDB_CODE_SUCCESS;
}
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
int32_t code = 0;
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
......@@ -291,6 +327,15 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_USER_PASSINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey);
break;
}
default:
tscError("invalid hb key type:%d", kv->key);
break;
......@@ -472,6 +517,48 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS;
}
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
}
int32_t code = 0;
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
strncpy(user->user, pTscObj->user, TSDB_USER_LEN);
user->version = htonl(pTscObj->passInfo.ver);
SKv kv = {
.key = HEARTBEAT_KEY_USER_PASSINFO,
.valueLen = sizeof(SUserPassVersion),
.value = user,
};
tscDebug("hb got user basic info, valueLen:%d", kv.valueLen);
if (!req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) {
code = terrno ? terrno : TSDB_CODE_APP_ERROR;
goto _return;
}
_return:
releaseTscObj(connKey->tscRid);
if (code) {
tscError("hb got user basic info failed since %s", terrstr(code));
}
return code;
}
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SUserAuthVersion *users = NULL;
uint32_t userNum = 0;
......@@ -620,6 +707,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req);
hbGetUserBasicInfo(connKey, req);
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
......
......@@ -119,6 +119,23 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return NULL;
}
int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t *fp, void *param) {
if (taos == NULL) {
return 0;
}
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
tscError("invalid parameter for %s", __func__);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return -1;
}
pObj->passInfo.fp = fp;
return 0;
}
void taos_close_internal(void *taos) {
if (taos == NULL) {
return;
......
......@@ -192,6 +192,8 @@ void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) {
static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pReq) {
if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1;
if (pReq->connKey.connType == CONN_TYPE__QUERY) {
if (tEncodeI64(pEncoder, pReq->app.appId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->app.pid) < 0) return -1;
......@@ -212,6 +214,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
queryNum = 1;
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->clusterId) < 0) return -1;
int32_t num = taosArrayGetSize(pReq->query->queryDesc);
if (tEncodeI32(pEncoder, num) < 0) return -1;
......@@ -276,7 +279,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query));
if (NULL == pReq->query) return -1;
if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->clusterId) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(pDecoder, &num) < 0) return -1;
if (num > 0) {
......@@ -2781,6 +2784,59 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray);
}
int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i);
if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1;
if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp));
if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserPassRsp rsp = {0};
if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1;
if (tDecodeI32(&decoder, &rsp.version) < 0) return -1;
taosArrayPush(pRsp->pArray, &rsp);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) {
if(pRsp) {
taosArrayDestroy(pRsp->pArray);
}
}
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -278,6 +278,7 @@ typedef struct {
int8_t reserve;
int32_t acctId;
int32_t authVersion;
int32_t passVersion;
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* topics;
......
......@@ -34,6 +34,8 @@ SHashObj *mndDupDbHash(SHashObj *pOld);
SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
......
......@@ -421,6 +421,7 @@ void dumpUser(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
tjsonAddStringToObject(item, "superUser", i642str(pObj->superUser));
tjsonAddStringToObject(item, "authVersion", i642str(pObj->authVersion));
tjsonAddStringToObject(item, "passVersion", i642str(pObj->passVersion));
tjsonAddStringToObject(item, "numOfReadDbs", i642str(taosHashGetSize(pObj->readDbs)));
tjsonAddStringToObject(item, "numOfWriteDbs", i642str(taosHashGetSize(pObj->writeDbs)));
sdbRelease(pSdb, pObj);
......
......@@ -547,6 +547,16 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
break;
}
case HEARTBEAT_KEY_USER_PASSINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_APP_ERROR;
......
......@@ -22,7 +22,7 @@
#include "mndTrans.h"
#include "tbase64.h"
#define USER_VER_NUMBER 2
#define USER_VER_NUMBER 3
#define USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
......@@ -142,6 +142,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_INT8(pRaw, dataPos, pUser->enable, _OVER)
SDB_SET_INT8(pRaw, dataPos, pUser->reserve, _OVER)
SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER)
SDB_SET_INT32(pRaw, dataPos, pUser->passVersion, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER)
......@@ -188,7 +189,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != 1 && sver != 2) {
if (sver < 1 || sver > USER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
......@@ -210,6 +211,9 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pUser->enable, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pUser->reserve, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pUser->authVersion, _OVER)
if (sver >= 3) {
SDB_GET_INT32(pRaw, dataPos, &pUser->passVersion, _OVER)
}
int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0;
......@@ -322,6 +326,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
taosWLockLatch(&pOld->lock);
pOld->updateTime = pNew->updateTime;
pOld->authVersion = pNew->authVersion;
pOld->passVersion = pNew->passVersion;
pOld->sysInfo = pNew->sysInfo;
pOld->enable = pNew->enable;
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
......@@ -543,10 +548,14 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER;
newUser.passVersion = pUser->passVersion;
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass);
memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN);
if (0 != strncmp(pUser->pass, pass, TSDB_PASSWORD_LEN)) {
++newUser.passVersion;
}
}
if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) {
......@@ -782,6 +791,50 @@ _OVER:
return code;
}
#if 0
static int32_t mndProcessGetUserPassReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SUserObj *pUser = NULL;
SGetUserPassReq req = {0};
SGetUserPassRsp rsp = {0};
if (tDeserializeSGetUserPassReq(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mTrace("user:%s, start to get pass", req.user);
pUser = mndAcquireUser(pMnode, req.user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_USER_NOT_EXIST;
goto _OVER;
}
memcpy(rsp.user, pUser->user, TSDB_USER_LEN);
rsp.version = pUser->passVersion;
int32_t contLen = tSerializeSGetUserPassRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSGetUserPassRsp(pRsp, contLen, &rsp);
pReq->info.rsp = pRsp;
pReq->info.rspLen = contLen;
code = 0;
_OVER:
mndReleaseUser(pMnode, pUser);
return code;
}
#endif
static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
......@@ -1019,6 +1072,72 @@ _OVER:
return code;
}
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen) {
int32_t code = 0;
SUserPassBatchRsp batchRsp = {0};
for (int32_t i = 0; i < numOfUses; ++i) {
SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user);
if (pUser == NULL) {
mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr());
continue;
}
pUsers[i].version = ntohl(pUsers[i].version);
if (pUser->passVersion <= pUsers[i].version) {
mDebug("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion,
pUsers[i].version);
mndReleaseUser(pMnode, pUser);
continue;
}
SGetUserPassRsp rsp = {0};
memcpy(rsp.user, pUser->user, TSDB_USER_LEN);
rsp.version = pUser->passVersion;
if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
assert(0);
goto _OVER;
}
taosArrayPush(batchRsp.pArray, &rsp);
mndReleaseUser(pMnode, pUser);
}
if (taosArrayGetSize(batchRsp.pArray) <= 0) {
goto _OVER;
}
int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp);
if (rspLen < 0) {
assert(0);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
void *pRsp = taosMemoryMalloc(rspLen);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
assert(0);
goto _OVER;
}
tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
_OVER:
if (code) {
*ppRsp = NULL;
*pRspLen = 0;
assert(0);
}
tFreeSUserPassBatchRsp(&batchRsp);
return code;
}
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册