未验证 提交 22743e49 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #11721 from taosdata/feature/authImp

enh(rpc):add auth
...@@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); ...@@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct {
SEpSet epSet;
} SMEpSet;
int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq);
int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq);
typedef struct { typedef struct {
int8_t connType; int8_t connType;
int32_t pid; int32_t pid;
...@@ -2725,6 +2732,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -2725,6 +2732,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
} }
return buf; return buf;
} }
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -54,12 +54,13 @@ typedef struct { ...@@ -54,12 +54,13 @@ typedef struct {
uint16_t clientPort; uint16_t clientPort;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t rspLen; int32_t rspLen;
void *pRsp; void * pRsp;
void *pNode; void * pNode;
} SNodeMsg; } SNodeMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *);
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
...@@ -80,22 +81,25 @@ typedef struct SRpcInit { ...@@ -80,22 +81,25 @@ typedef struct SRpcInit {
RpcCfp cfp; RpcCfp cfp;
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
RpcAfp afp;; RpcAfp afp;
// user defined retry func
RpcRfp rfp;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
typedef struct { typedef struct {
void *val; void *val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcCtxVal; } SRpcCtxVal;
typedef struct { typedef struct {
int32_t msgType; int32_t msgType;
void *val; void * val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcBrokenlinkVal; } SRpcBrokenlinkVal;
typedef struct { typedef struct {
......
...@@ -844,6 +844,27 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { ...@@ -844,6 +844,27 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
taosArrayDestroy(pReq->pFields); taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL; pReq->pFields = NULL;
} }
int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
......
...@@ -63,13 +63,14 @@ typedef struct { ...@@ -63,13 +63,14 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
int (*retry)(void* parent, SRpcMsg*, SEpSet*);
int32_t refCount; int32_t refCount;
void* parent; void* parent;
void* idPool; // handle to ID pool void* idPool; // handle to ID pool
void* tmrCtrl; // handle to timer void* tmrCtrl; // handle to timer
SHashObj* hash; // handle returned by hash utility SHashObj* hash; // handle returned by hash utility
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization
TdThreadMutex mutex; TdThreadMutex mutex;
} SRpcInfo; } SRpcInfo;
......
...@@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->retry = pInit->rfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
...@@ -100,11 +101,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { ...@@ -100,11 +101,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SEpSet); SMEpSet msg = {.epSet = *pEpSet};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
if (rpcMsg.pCont == NULL) return; rpcMsg.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rpcMsg.pCont, len, &msg);
memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
rpcMsg.handle = thandle; rpcMsg.handle = thandle;
......
...@@ -31,12 +31,8 @@ typedef struct SCliConn { ...@@ -31,12 +31,8 @@ typedef struct SCliConn {
int hThrdIdx; int hThrdIdx;
STransCtx ctx; STransCtx ctx;
bool broken; // link broken or not bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
int release; // 1: release
// spi configure
char spi;
char secured;
char* ip; char* ip;
uint32_t port; uint32_t port;
...@@ -44,7 +40,6 @@ typedef struct SCliConn { ...@@ -44,7 +40,6 @@ typedef struct SCliConn {
// debug and log info // debug and log info
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
...@@ -102,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status); ...@@ -102,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static void cliAppCb(SCliConn* pConn, STransMsg* pMsg);
static SCliConn* cliCreateConn(SCliThrdObj* thrd); static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
...@@ -303,8 +300,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -303,8 +300,6 @@ void cliHandleResp(SCliConn* conn) {
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
conn->secured = pHead->secured;
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tTrace("except, server continue send while cli ignore it"); tTrace("except, server continue send while cli ignore it");
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
...@@ -318,7 +313,8 @@ void cliHandleResp(SCliConn* conn) { ...@@ -318,7 +313,8 @@ void cliHandleResp(SCliConn* conn) {
if (pCtx == NULL || pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); cliAppCb(conn, &transMsg);
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else { } else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
...@@ -384,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -384,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) {
once = true; once = true;
continue; continue;
} }
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); cliAppCb(pConn, &transMsg);
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else { } else {
tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
...@@ -884,6 +881,18 @@ int cliRBChoseIdx(STrans* pTransInst) { ...@@ -884,6 +881,18 @@ int cliRBChoseIdx(STrans* pTransInst) {
} }
return index % pTransInst->numOfThreads; return index % pTransInst->numOfThreads;
} }
void cliAppCb(SCliConn* pConn, STransMsg* transMsg) {
SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) {
SMEpSet emsg = {0};
tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg);
pTransInst->retry(pTransInst, transMsg, &(emsg.epSet));
} else {
pTransInst->cfp(pTransInst->parent, transMsg, NULL);
}
}
void transCloseClient(void* arg) { void transCloseClient(void* arg) {
SCliObj* cli = arg; SCliObj* cli = arg;
......
...@@ -30,7 +30,6 @@ typedef struct SSrvConn { ...@@ -30,7 +30,6 @@ typedef struct SSrvConn {
uv_timer_t pTimer; uv_timer_t pTimer;
queue queue; queue queue;
int ref;
int persist; // persist connection or not int persist; // persist connection or not
SConnBuffer readBuf; // read buf, SConnBuffer readBuf; // read buf,
int inType; int inType;
...@@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) {
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
tTrace("work thread quit"); tTrace("work thread quit");
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
// uv_loop_close(thrd->loop);
// uv_stop(thrd->loop);
} }
} }
...@@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { ...@@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true; thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) { if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
// uv_loop_close(thrd->loop);
// uv_stop(thrd->loop);
} else { } else {
destroyAllConn(thrd); destroyAllConn(thrd);
} }
...@@ -851,10 +846,8 @@ void transRefSrvHandle(void* handle) { ...@@ -851,10 +846,8 @@ void transRefSrvHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
return; return;
} }
SSrvConn* conn = handle;
int ref = T_REF_INC((SSrvConn*)handle); int ref = T_REF_INC((SSrvConn*)handle);
UNUSED(ref); tDebug("server conn %p ref count: %d", handle, ref);
} }
void transUnrefSrvHandle(void* handle) { void transUnrefSrvHandle(void* handle) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册