提交 8ff4a07d 编写于 作者: H Haojun Liao

[td-10564] remove unused attributes in user structure, add new log for performance metric.

上级 04057a39
......@@ -358,6 +358,7 @@ typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
int64_t startTime;
} SConnectMsg;
typedef struct SEpSet {
......@@ -368,14 +369,12 @@ typedef struct SEpSet {
} SEpSet;
typedef struct {
int32_t acctId;
int32_t clusterId;
int32_t connId;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t reserved[5];
SEpSet epSet;
int32_t acctId;
uint32_t clusterId;
int32_t connId;
int8_t superUser;
int8_t reserved[5];
SEpSet epSet;
} SConnectRsp;
typedef struct {
......
......@@ -117,8 +117,6 @@ extern SAppInfo appInfo;
extern int32_t tscReqRef;
extern void *tscQhandle;
extern int32_t tscConnRef;
extern void *tscRpcCache;
extern pthread_mutex_t rpcObjMutex;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsg);
extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
......@@ -126,7 +124,7 @@ extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char
int taos_init();
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pTscObj);
void destroyTscObj(void*pObj);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(void* p);
......
......@@ -201,11 +201,10 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
pthread_mutex_unlock(&pObj->mutex);
// tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
// tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
// pConnect->pid = htonl(taosGetPId());
// taosGetCurrentAPPName(pConnect->appName, NULL);
pMsgBody->pData = pConnect;
return 0;
}
......@@ -232,23 +231,6 @@ int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody
return TSDB_CODE_SUCCESS;
}
//
//int tscBuildAndSendRequest(SRequestObj *pRequest) {
// assert(pRequest != NULL);
// char name[TSDB_TABLE_FNAME_LEN] = {0};
//
// uint32_t type = 0;
// tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pRequest->requestId, taosMsg[pRequest->type], name, type);
// if (pRequest->type < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
//
// } else if (pCmd->command >= TSDB_SQL_LOCAL) {
// return (*tscProcessMsgRsp[pCmd->command])(pSql);
// }
//
// return buildConnectMsg(pRequest);
//}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int64_t requestRefId = (int64_t)pMsg->ahandle;
......@@ -275,13 +257,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
* The actual inserted number of points is the first number.
*/
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pRequest->type],
tstrerror(pMsg->code), pMsg->contLen);
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
if (handleRequestRspFp[pRequest->type]) {
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, pMsg->pCont, pMsg->contLen);
}
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pRequest->type],
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pMsg->msgType],
tstrerror(pMsg->code), pMsg->contLen);
}
......
......@@ -61,14 +61,6 @@ void taos_cleanup(void) {
tscConnRef = -1;
taosCloseRef(id);
p = tscRpcCache;
tscRpcCache = NULL;
if (p != NULL) {
taosCacheCleanup(p);
pthread_mutex_destroy(&rpcObjMutex);
}
rpcCleanup();
taosCloseLog();
}
......
......@@ -34,7 +34,6 @@ SAppInfo appInfo;
int32_t tscReqRef = -1;
int32_t tscConnRef = -1;
void *tscQhandle = NULL;
void *tscRpcCache= NULL; // TODO removed from here.
int32_t tsNumOfThreads = 1;
......@@ -55,7 +54,7 @@ static void registerRequest(SRequestObj* pRequest) {
int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from 0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
pRequest->pTscObj->id, num, currentInst, total);
}
}
......@@ -100,12 +99,14 @@ void tscFreeRpcObj(void *param) {
#endif
}
void tscReleaseRpc(void *param) {
if (param == NULL) {
void closeTransporter(STscObj* pTscObj) {
if (pTscObj == NULL || pTscObj->pTransporter == NULL) {
return;
}
taosCacheRelease(tscRpcCache, (void *)&param, false);
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pTransporter, pTscObj->id);
rpcClose(pTscObj->pTransporter);
pTscObj->pTransporter = NULL;
}
// TODO refactor
......@@ -133,11 +134,13 @@ void* openTransporter(const char *user, const char *auth) {
return pDnodeConn;
}
void destroyTscObj(void *pTscObj) {
STscObj *pObj = pTscObj;
// tscReleaseRpc(pObj->pRpcObj);
pthread_mutex_destroy(&pObj->mutex);
tfree(pObj);
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
tscDebug("connect obj destroyed, 0x%"PRIx64, pTscObj->id);
closeTransporter(pTscObj);
pthread_mutex_destroy(&pTscObj->mutex);
tfree(pTscObj);
}
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo) {
......@@ -157,6 +160,9 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t
pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(tscConnRef, pObj);
tscDebug("connect obj created, 0x%"PRIx64, pObj->id);
return pObj;
}
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) {
......@@ -219,7 +225,7 @@ void taos_init_imp(void) {
rpcInit();
tscDebug("starting to initialize TAOS client ...\nLocal End Point is:%s", tsLocalEp);
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
taosSetCoreDump(true);
......@@ -234,12 +240,7 @@ void taos_init_imp(void) {
return;
}
tscDebug("client task queue is initialized, numOfWorkers: %d", numOfThreads);
int refreshTime = 5;
tscRpcCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, true, tscFreeRpcObj, "rpcObj");
pthread_mutex_init(&rpcObjMutex, NULL);
tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads);
tscConnRef = taosOpenRef(200, destroyTscObj);
tscReqRef = taosOpenRef(40960, destroyRequest);
......
......@@ -17,6 +17,7 @@
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "tep.h"
int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
......
......@@ -184,9 +184,7 @@ typedef struct SUserObj {
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t superUser;
int32_t acctId;
SHashObj *prohibitDbHash;
} SUserObj;
......
......@@ -29,6 +29,7 @@ typedef struct {
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int32_t pid; // pid of app that invokes taosc
int64_t appStartTime; // app start time
int32_t id;
int8_t killed;
int8_t align;
......@@ -44,7 +45,7 @@ typedef struct {
SQueryDesc *pQueries;
} SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app);
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
......@@ -102,13 +103,14 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app) {
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
SConnObj connObj = {.pid = pid,
.appStartTime = startTime,
.id = connId,
.killed = 0,
.port = port,
......@@ -195,6 +197,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SConnectMsg *pReq = pMsg->rpcMsg.pCont;
pReq->pid = htonl(pReq->pid);
pReq->startTime = htobe64(pReq->startTime);
SRpcConnInfo info = {0};
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) {
......@@ -216,7 +219,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
mndReleaseDb(pMnode, pDb);
}
SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app);
SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, pReq->startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr());
return -1;
......@@ -233,9 +236,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser != NULL) {
pRsp->acctId = htonl(pUser->acctId);
pRsp->superAuth = pUser->superAuth;
pRsp->readAuth = pUser->readAuth;
pRsp->writeAuth = pUser->writeAuth;
pRsp->superUser = pUser->superUser;
mndReleaseUser(pMnode, pUser);
}
......@@ -246,7 +247,8 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
pMsg->contLen = sizeof(SConnectRsp);
pMsg->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->id);
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pReq->app);
return 0;
}
......@@ -301,7 +303,7 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app);
pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, 0);
if (pConn == NULL) {
mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr());
return -1;
......@@ -368,7 +370,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......@@ -399,7 +401,7 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......@@ -430,7 +432,7 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......@@ -459,7 +461,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......@@ -587,7 +589,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......@@ -803,7 +805,7 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
......
......@@ -65,11 +65,9 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.readAuth = 1;
userObj.writeAuth = 1;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.superAuth = 1;
userObj.superUser = 1;
}
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
......@@ -102,9 +100,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->superAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->readAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->writeAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->superUser)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
......@@ -130,9 +126,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->readAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->writeAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superUser)
return pRow;
}
......@@ -175,9 +169,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOldUser, SUserObj *pNe
memcpy(pOldUser->acct, pNewUser->acct, TSDB_USER_LEN);
pOldUser->createdTime = pNewUser->createdTime;
pOldUser->updateTime = pNewUser->updateTime;
pOldUser->superAuth = pNewUser->superAuth;
pOldUser->readAuth = pNewUser->readAuth;
pOldUser->writeAuth = pNewUser->writeAuth;
pOldUser->superUser = pNewUser->superUser;
return 0;
}
......@@ -198,9 +190,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superAuth = 0;
userObj.readAuth = 1;
userObj.writeAuth = 1;
userObj.superUser = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
......@@ -513,15 +503,15 @@ static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) {
if (pUser->superUser) {
const char *src = "super";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else if (pUser->writeAuth) {
const char *src = "writable";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else {
const char *src = "readable";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
// } else if (pUser->writeAuth) {
// const char *src = "writable";
// STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
// } else {
// const char *src = "readable";
// STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
}
cols++;
......
......@@ -229,8 +229,6 @@ static void rpcInitImp(void) {
tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree);
return 0;
}
int32_t rpcInit(void) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册