未验证 提交 25569755 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10809 from taosdata/feature/supportQuery

handle except
...@@ -38,13 +38,13 @@ typedef struct SRpcConnInfo { ...@@ -38,13 +38,13 @@ typedef struct SRpcConnInfo {
typedef struct SRpcMsg { typedef struct SRpcMsg {
tmsg_t msgType; tmsg_t msgType;
tmsg_t expectMsgType;
void * pCont; void * pCont;
int contLen; int contLen;
int32_t code; int32_t code;
void * handle; // rpc handle returned to app void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client void * ahandle; // app handle set by client
int noResp; // has response or not(default 0 indicate resp); int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not
} SRpcMsg; } SRpcMsg;
...@@ -69,18 +69,19 @@ typedef struct SRpcInit { ...@@ -69,18 +69,19 @@ typedef struct SRpcInit {
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
// call back to keep conn or not
bool (*pfp)(void *parent, tmsg_t msgType);
// to support Send messages multiple times on a link
void *(*mfp)(void *parent, tmsg_t msgType);
// call back to handle except when query/fetch in progress
bool (*efp)(void *parent, tmsg_t msgType);
void *parent; void *parent;
} SRpcInit; } SRpcInit;
typedef struct {
void * val;
int32_t len;
void (*free)(void *arg);
} SRpcCtxVal;
typedef struct {
SHashObj *args;
} SRpcCtx;
int32_t rpcInit(); int32_t rpcInit();
void rpcCleanup(); void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc); void * rpcOpen(const SRpcInit *pRpc);
...@@ -89,16 +90,17 @@ void * rpcMallocCont(int contLen); ...@@ -89,16 +90,17 @@ void * rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int contLen); void * rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
// just release client conn to rpc instance, no close sock // just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle, int8_t type); void rpcReleaseHandle(void *handle, int8_t type); //
void rpcRefHandle(void *handle, int8_t type); void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type);
......
...@@ -90,7 +90,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -90,7 +90,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.label = "TSC"; rpcInit.label = "TSC";
rpcInit.numOfThreads = numOfThread; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.pfp = persistConnForSpecificMsg;
rpcInit.sessions = tsMaxConnections; rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
......
...@@ -155,6 +155,10 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp ...@@ -155,6 +155,10 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
.ahandle = (void*)pInfo, .ahandle = (void*)pInfo,
.handle = pInfo->msgInfo.handle, .handle = pInfo->msgInfo.handle,
.code = 0}; .code = 0};
if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH ||
pInfo->msgType == TDMT_VND_QUERY_CONTINUE) {
rpcMsg.persistHandle = 1;
}
assert(pInfo->fp != NULL); assert(pInfo->fp != NULL);
......
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
*/ */
#ifdef USE_UV #ifdef USE_UV
#ifdef __cplusplus
extern "C" {
#endif
#include <uv.h> #include <uv.h>
#include "lz4.h" #include "lz4.h"
#include "os.h" #include "os.h"
...@@ -121,6 +125,8 @@ typedef struct { ...@@ -121,6 +125,8 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal;
typedef SRpcInfo STrans; typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo; typedef SRpcConnInfo STransHandleInfo;
...@@ -128,15 +134,10 @@ typedef struct { ...@@ -128,15 +134,10 @@ typedef struct {
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
tmsg_t msgType; // message type tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
// int32_t code; // error code
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type cli/srv int8_t connType; // connection type cli/srv
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
...@@ -150,11 +151,12 @@ typedef struct { ...@@ -150,11 +151,12 @@ typedef struct {
typedef struct { typedef struct {
char version : 4; // RPC version char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4 char comp : 2; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits char noResp : 2; // noResp bits, 0: resp, 1: resp
char spi : 1; // security parameter index char persist : 2; // persist handle,0: no persit, 1: persist handle
char release : 2;
char secured : 2; char secured : 2;
char encrypt : 3; // encrypt algorithm, 0: no encryption char spi : 2;
uint32_t code; // del later uint32_t code; // del later
uint32_t msgType; uint32_t msgType;
...@@ -179,6 +181,9 @@ typedef struct { ...@@ -179,6 +181,9 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
typedef enum { Normal, Quit, Release, Register } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
...@@ -255,9 +260,10 @@ void transUnrefCliHandle(void* handle); ...@@ -255,9 +260,10 @@ void transUnrefCliHandle(void* handle);
void transReleaseCliHandle(void* handle); void transReleaseCliHandle(void* handle);
void transReleaseSrvHandle(void* handle); void transReleaseSrvHandle(void* handle);
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* pMsg); void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg);
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
...@@ -266,4 +272,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -266,4 +272,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
void transCloseClient(void* arg); void transCloseClient(void* arg);
void transCloseServer(void* arg); void transCloseServer(void* arg);
void transCtxInit(STransCtx* ctx);
void transCtxDestroy(STransCtx* ctx);
void transCtxClear(STransCtx* ctx);
void transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
#ifdef __cplusplus
}
#endif
#endif #endif
...@@ -63,9 +63,6 @@ typedef struct { ...@@ -63,9 +63,6 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
bool (*pfp)(void* parent, tmsg_t msgType);
void* (*mfp)(void* parent, tmsg_t msgType);
bool (*efp)(void* parent, tmsg_t msgType);
int32_t refCount; int32_t refCount;
void* parent; void* parent;
......
...@@ -39,9 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -39,9 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->pfp = pInit->pfp;
pRpc->mfp = pInit->mfp;
pRpc->efp = pInit->efp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
...@@ -121,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; } ...@@ -121,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port; uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg); transSendRequest(shandle, ip, port, pMsg, NULL);
}
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg, pCtx);
} }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
...@@ -142,6 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) { ...@@ -142,6 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle); (*taosUnRefHandle[type])(handle);
} }
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); }
void rpcReleaseHandle(void* handle, int8_t type) { void rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle); (*transReleaseHandle[type])(handle);
......
...@@ -17,11 +17,6 @@ ...@@ -17,11 +17,6 @@
#include "transComm.h" #include "transComm.h"
// Normal(default): send/recv msg
// Quit: quit rpc inst
// Release: release handle to rpc inst
typedef enum { Normal, Quit, Release } SCliMsgType;
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
...@@ -35,8 +30,10 @@ typedef struct SCliConn { ...@@ -35,8 +30,10 @@ typedef struct SCliConn {
uint64_t expireTime; uint64_t expireTime;
int hThrdIdx; int hThrdIdx;
bool broken; // link broken or not bool broken; // link broken or not
STransCtx ctx;
int persist; // ConnStatus status; //
int release; // 1: release
// spi configure // spi configure
char spi; char spi;
char secured; char secured;
...@@ -55,7 +52,7 @@ typedef struct SCliMsg { ...@@ -55,7 +52,7 @@ typedef struct SCliMsg {
STransMsg msg; STransMsg msg;
queue q; queue q;
uint64_t st; uint64_t st;
SCliMsgType type; STransMsgType type;
} SCliMsg; } SCliMsg;
typedef struct SCliThrdObj { typedef struct SCliThrdObj {
...@@ -113,10 +110,12 @@ static void cliSend(SCliConn* pConn); ...@@ -113,10 +110,12 @@ static void cliSend(SCliConn* pConn);
static void cliHandleResp(SCliConn* conn); static void cliHandleResp(SCliConn* conn);
// handle except about conn // handle except about conn
static void cliHandleExcept(SCliConn* conn); static void cliHandleExcept(SCliConn* conn);
// handle req from app // handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease};
static void cliSendQuit(SCliThrdObj* thrd); static void cliSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(STransMsg* userdata); static void destroyUserdata(STransMsg* userdata);
...@@ -133,6 +132,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -133,6 +132,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
if (T_REF_VAL_GET(conn) == 1) { \
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
goto _RETURN; \
} \
} while (0)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
...@@ -151,14 +164,16 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -151,14 +164,16 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_SET_PERSIST_BY_APP(conn) \ #define CONN_SET_PERSIST_BY_APP(conn) \
do { \ do { \
if (conn->persist == false) { \ if (conn->status == ConnNormal) { \
conn->persist = true; \ conn->status = ConnAcquire; \
transRefCliHandle(conn); \ transRefCliHandle(conn); \
} \ } \
} while (0) } while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
...@@ -177,7 +192,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -177,7 +192,6 @@ void cliHandleResp(SCliConn* conn) {
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead); transMsg.pCont = transContFromHead((char*)pHead);
...@@ -185,6 +199,8 @@ void cliHandleResp(SCliConn* conn) { ...@@ -185,6 +199,8 @@ void cliHandleResp(SCliConn* conn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.ahandle = NULL; transMsg.ahandle = NULL;
CONN_SHOULD_RELEASE(conn, pHead);
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
if (taosArrayGetSize(conn->cliMsgs) > 0) { if (taosArrayGetSize(conn->cliMsgs) > 0) {
pMsg = taosArrayGetP(conn->cliMsgs, 0); pMsg = taosArrayGetP(conn->cliMsgs, 0);
...@@ -193,16 +209,15 @@ void cliHandleResp(SCliConn* conn) { ...@@ -193,16 +209,15 @@ void cliHandleResp(SCliConn* conn) {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
// buf's mem alread translated to transMsg.pCont // buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) { if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.handle = conn; transMsg.handle = conn;
CONN_SET_PERSIST_BY_APP(conn);
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
...@@ -241,6 +256,8 @@ void cliHandleResp(SCliConn* conn) { ...@@ -241,6 +256,8 @@ void cliHandleResp(SCliConn* conn) {
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
_RETURN:
return;
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExcept(SCliConn* pConn) {
...@@ -268,7 +285,7 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -268,7 +285,7 @@ void cliHandleExcept(SCliConn* pConn) {
transMsg.ahandle = NULL; transMsg.ahandle = NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
...@@ -359,6 +376,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -359,6 +376,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
static void addConnToPool(void* pool, SCliConn* conn) { static void addConnToPool(void* pool, SCliConn* conn) {
char key[128] = {0}; char key[128] = {0};
transCtxDestroy(&conn->ctx);
tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key, conn->ip, strlen(conn->ip));
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
...@@ -367,6 +385,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -367,6 +385,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->status = ConnNormal;
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
...@@ -420,16 +439,16 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { ...@@ -420,16 +439,16 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->writeReq.data = conn; conn->writeReq.data = conn;
conn->connReq.data = conn; conn->connReq.data = conn;
conn->cliMsgs = taosArrayInit(2, sizeof(void*)); conn->cliMsgs = taosArrayInit(2, sizeof(void*));
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->persist = false; conn->status = ConnNormal;
conn->broken = false; conn->broken = 0;
transRefCliHandle(conn); transRefCliHandle(conn);
return conn; return conn;
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
...@@ -439,6 +458,7 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -439,6 +458,7 @@ static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
free(conn->ip); free(conn->ip);
free(conn->stream); free(conn->stream);
transCtxDestroy(&conn->ctx);
taosArrayDestroy(conn->cliMsgs); taosArrayDestroy(conn->cliMsgs);
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
...@@ -490,7 +510,10 @@ void cliSend(SCliConn* pConn) { ...@@ -490,7 +510,10 @@ void cliSend(SCliConn* pConn) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
...@@ -513,15 +536,22 @@ void cliSend(SCliConn* pConn) { ...@@ -513,15 +536,22 @@ void cliSend(SCliConn* pConn) {
msgLen += sizeof(STransUserMsg); msgLen += sizeof(STransUserMsg);
} }
pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0; pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn);
}
pConn->writeReq.data = pConn; pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
...@@ -562,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -562,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
while (taosArrayGetSize(conn->cliMsgs) > 0) { taosArrayPush(conn->cliMsgs, &pMsg);
SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0); if (taosArrayGetSize(conn->cliMsgs) >= 2) {
destroyCmsg(pMsg); return; // send one by one
taosArrayRemove(conn->cliMsgs, 0);
}
transDestroyBuffer(&conn->readBuf);
if (conn->persist && T_REF_VAL_GET(conn) >= 2) {
conn->persist = false;
transUnrefCliHandle(conn);
addConnToPool(pThrd->pool, conn);
} else {
transUnrefCliHandle(conn);
} }
cliSend(conn);
} }
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
...@@ -609,10 +630,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -609,10 +630,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (conn != NULL) { if (conn != NULL) {
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
transCtxMerge(&conn->ctx, &pCtx->appCtx);
if (taosArrayGetSize(conn->cliMsgs) > 0) { if (taosArrayGetSize(conn->cliMsgs) > 0) {
taosArrayPush(conn->cliMsgs, &pMsg); taosArrayPush(conn->cliMsgs, &pMsg);
return; return;
} }
taosArrayPush(conn->cliMsgs, &pMsg); taosArrayPush(conn->cliMsgs, &pMsg);
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
cliSend(conn); cliSend(conn);
...@@ -652,14 +675,10 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -652,14 +675,10 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
if (pMsg->type == Normal) { continue;
cliHandleReq(pMsg, pThrd);
} else if (pMsg->type == Quit) {
cliHandleQuit(pMsg, pThrd);
} else if (pMsg->type == Release) {
cliHandleRelease(pMsg, pThrd);
} }
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
count++; count++;
} }
if (count >= 2) { if (count >= 2) {
...@@ -802,13 +821,13 @@ void transReleaseCliHandle(void* handle) { ...@@ -802,13 +821,13 @@ void transReleaseCliHandle(void* handle) {
STransMsg tmsg = {.handle = handle}; STransMsg tmsg = {.handle = handle};
SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); SCliMsg* cmsg = calloc(1, sizeof(SCliMsg));
cmsg->type = Release;
cmsg->msg = tmsg; cmsg->msg = tmsg;
cmsg->type = Release;
transSendAsync(thrd->asyncPool, &cmsg->q); transSendAsync(thrd->asyncPool, &cmsg->q);
} }
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
if (index == -1) { if (index == -1) {
...@@ -818,7 +837,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p ...@@ -818,7 +837,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later // imp later
} }
tDebug("send request at thread:%d %p", index, pMsg); tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port);
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
...@@ -826,6 +845,10 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p ...@@ -826,6 +845,10 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
pCtx->port = port; pCtx->port = port;
pCtx->hThrdIdx = index; pCtx->hThrdIdx = index;
if (ctx != NULL) {
pCtx->appCtx = *ctx;
}
assert(pTransInst->connType == TAOS_CONN_CLIENT); assert(pTransInst->connType == TAOS_CONN_CLIENT);
// atomic or not // atomic or not
...@@ -833,10 +856,12 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p ...@@ -833,10 +856,12 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->msg = *pMsg; cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->handle); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
...@@ -858,6 +883,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq ...@@ -858,6 +883,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
......
...@@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) { ...@@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) {
} }
return false; return false;
} }
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;} int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; }
int transUnpackMsg(STransMsgHead* msgHead) {return 0;} int transUnpackMsg(STransMsgHead* msgHead) { return 0; }
int transDestroyBuffer(SConnBuffer* buf) { int transDestroyBuffer(SConnBuffer* buf) {
if (buf->cap > 0) { if (buf->cap > 0) {
tfree(buf->buf); tfree(buf->buf);
...@@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) { ...@@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) {
return uv_async_send(async); return uv_async_send(async);
} }
void transCtxInit(STransCtx* ctx) {
// init transCtx
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
void transCtxDestroy(STransCtx* ctx) {
if (ctx->args == NULL) {
return;
}
STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
while (iter) {
iter->free(iter->val);
iter = taosHashIterate(ctx->args, iter);
}
taosHashCleanup(ctx->args);
}
void transCtxMerge(STransCtx* dst, STransCtx* src) {
if (dst->args == NULL) {
dst->args = src->args;
src->args = NULL;
return;
}
void* key = NULL;
size_t klen = 0;
void* iter = taosHashIterate(src->args, NULL);
while (iter) {
STransCtxVal* sVal = (STransCtxVal*)iter;
key = taosHashGetKey(sVal, &klen);
STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
if (dVal) {
dVal->free(dVal->val);
}
taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
iter = taosHashIterate(src->args, iter);
}
taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
if (ctx->args == NULL) {
return NULL;
}
STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
if (cVal == NULL) {
return NULL;
}
char* ret = calloc(1, cVal->len);
memcpy(ret, (char*)cVal->val, cVal->len);
return (void*)ret;
}
#endif #endif
...@@ -17,6 +17,12 @@ ...@@ -17,6 +17,12 @@
#include "transComm.h" #include "transComm.h"
typedef struct {
int notifyCount; //
int init; // init or not
STransMsg msg;
} SSrvRegArg;
typedef struct SSrvConn { typedef struct SSrvConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
...@@ -33,8 +39,10 @@ typedef struct SSrvConn { ...@@ -33,8 +39,10 @@ typedef struct SSrvConn {
void* hostThrd; void* hostThrd;
SArray* srvMsgs; SArray* srvMsgs;
SSrvRegArg regArg;
bool broken; // conn broken; bool broken; // conn broken;
ConnStatus status;
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
...@@ -50,6 +58,7 @@ typedef struct SSrvMsg { ...@@ -50,6 +58,7 @@ typedef struct SSrvMsg {
SSrvConn* pConn; SSrvConn* pConn;
STransMsg msg; STransMsg msg;
queue q; queue q;
STransMsgType type;
} SSrvMsg; } SSrvMsg;
typedef struct SWorkThrdObj { typedef struct SWorkThrdObj {
...@@ -58,7 +67,6 @@ typedef struct SWorkThrdObj { ...@@ -58,7 +67,6 @@ typedef struct SWorkThrdObj {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
...@@ -85,6 +93,15 @@ typedef struct SServerObj { ...@@ -85,6 +93,15 @@ typedef struct SServerObj {
static const char* notify = "a"; static const char* notify = "a";
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
goto _RETURE; \
} \
} while (0)
// refactor later // refactor later
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
...@@ -113,6 +130,13 @@ static void destroySmsg(SSrvMsg* smsg); ...@@ -113,6 +130,13 @@ static void destroySmsg(SSrvMsg* smsg);
static SSrvConn* createConn(void* hThrd); static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister};
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
// server and worker thread // server and worker thread
...@@ -217,8 +241,8 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -217,8 +241,8 @@ static void uvHandleReq(SSrvConn* pConn) {
if (pHead->secured == 1) { if (pHead->secured == 1) {
pHead->msgLen -= sizeof(STransUserMsg); pHead->msgLen -= sizeof(STransUserMsg);
} }
//
} }
CONN_SHOULD_RELEASE(pConn, pHead);
STransMsg transMsg; STransMsg transMsg;
transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.contLen = transContLenFromMsg(pHead->msgLen);
...@@ -230,24 +254,34 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -230,24 +254,34 @@ static void uvHandleReq(SSrvConn* pConn) {
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) {
if (pHead->resflag == 0) { if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
}
}
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
transMsg.handle = pConn;
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), transMsg.contLen); ntohs(pConn->locaddr.sin_port), transMsg.contLen);
} else { } else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
// no ref here
}
if (pHead->noResp == 0) {
transMsg.handle = pConn;
} }
STrans* pTransInst = (STrans*)p->shandle; STrans* pTransInst = (STrans*)p->shandle;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
// validate msg type _RETURE:
return;
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
...@@ -272,11 +306,13 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -272,11 +306,13 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
tError("server conn %p read error: %s", conn, uv_err_name(nread)); tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0) { if (nread < 0) {
conn->broken = true; conn->broken = true;
uvNotifyLinkBrokenToApp(conn); if (conn->status == ConnAcquire) {
if (conn->regArg.init) {
// STrans* pTransInst = conn->pTransInst; STrans* pTransInst = conn->pTransInst;
// if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
//} memset(&conn->regArg, 0, sizeof(conn->regArg));
}
}
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
} }
...@@ -301,8 +337,22 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -301,8 +337,22 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
taosArrayRemove(conn->srvMsgs, 0); taosArrayRemove(conn->srvMsgs, 0);
if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal;
transUnrefSrvHandle(conn);
} else if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
free(msg);
return;
}
destroySmsg(msg); destroySmsg(msg);
// send second data, just use for push // send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) { if (taosArrayGetSize(conn->srvMsgs) > 0) {
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
...@@ -312,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -312,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
} }
} else { } else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
conn->broken = false; conn->broken = true;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
} }
...@@ -339,6 +389,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -339,6 +389,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
pHead->secured = pMsg->code == 0 ? 1 : 0; // pHead->secured = pMsg->code == 0 ? 1 : 0; //
pHead->msgType = smsg->pConn->inType + 1; pHead->msgType = smsg->pConn->inType + 1;
pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
// add more info // add more info
char* msg = (char*)pHead; char* msg = (char*)pHead;
...@@ -368,13 +419,16 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -368,13 +419,16 @@ static void uvStartSendResp(SSrvMsg* smsg) {
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
if (pConn->broken == true) { if (pConn->broken == true) {
// persist by
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
if (pConn->status == ConnNormal) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
}
if (taosArrayGetSize(pConn->srvMsgs) > 0) { if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
taosArrayPush(pConn->srvMsgs, &smsg); taosArrayPush(pConn->srvMsgs, &smsg);
return; return;
...@@ -384,16 +438,6 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -384,16 +438,6 @@ static void uvStartSendResp(SSrvMsg* smsg) {
return; return;
} }
static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
STrans* pTransInst = conn->pTransInst;
if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
STransMsg transMsg = {0};
transMsg.msgType = conn->inType;
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// transRefSrvHandle(conn);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
}
}
static void destroySmsg(SSrvMsg* smsg) { static void destroySmsg(SSrvMsg* smsg) {
if (smsg == NULL) { if (smsg == NULL) {
return; return;
...@@ -408,6 +452,9 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { ...@@ -408,6 +452,9 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
QUEUE_INIT(h); QUEUE_INIT(h);
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
while (T_REF_VAL_GET(c) >= 2) {
transUnrefSrvHandle(c);
}
transUnrefSrvHandle(c); transUnrefSrvHandle(c);
} }
} }
...@@ -431,20 +478,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -431,20 +478,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("unexcept occurred, continue"); tError("unexcept occurred, continue");
continue; continue;
} }
if (msg->pConn == NULL) { (*transAsyncHandle[msg->type])(msg, pThrd);
free(msg);
bool noConn = QUEUE_IS_EMPTY(&pThrd->conn);
if (noConn == true) {
uv_loop_close(pThrd->loop);
uv_stop(pThrd->loop);
} else {
destroyAllConn(pThrd);
// uv_loop_close(pThrd->loop);
pThrd->quit = true;
}
} else {
uvStartSendResp(msg);
}
} }
} }
static void uvAcceptAsyncCb(uv_async_t* async) { static void uvAcceptAsyncCb(uv_async_t* async) {
...@@ -632,7 +666,9 @@ static SSrvConn* createConn(void* hThrd) { ...@@ -632,7 +666,9 @@ static SSrvConn* createConn(void* hThrd) {
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn); tTrace("conn %p created", pConn);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false; pConn->broken = false;
pConn->status = ConnNormal;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
return pConn; return pConn;
...@@ -748,7 +784,58 @@ End: ...@@ -748,7 +784,58 @@ End:
transCloseServer(srv); transCloseServer(srv);
return NULL; return NULL;
} }
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_loop_close(thrd->loop);
uv_stop(thrd->loop);
} else {
destroyAllConn(thrd);
thrd->quit = true;
}
free(msg);
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
// release handle to rpc init
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg);
return;
}
taosArrayPush(conn->srvMsgs, &msg);
uvStartSendRespInternal(msg);
return;
} else if (conn->status == ConnRelease) {
// already release by server app, do nothing
} else if (conn->status == ConnNormal) {
// no nothing
// user should not call this rpcRelease handle;
}
free(msg);
}
void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
// send msg to client
uvStartSendResp(msg);
}
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg);
return;
}
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
free(msg);
}
}
void destroyWorkThrd(SWorkThrdObj* pThrd) { void destroyWorkThrd(SWorkThrdObj* pThrd) {
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
...@@ -759,10 +846,10 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { ...@@ -759,10 +846,10 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
free(pThrd); free(pThrd);
} }
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); SSrvMsg* msg = calloc(1, sizeof(SSrvMsg));
msg->type = Quit;
tDebug("server send quit msg to work thread"); tDebug("server send quit msg to work thread");
transSendAsync(pThrd->asyncPool, &msg->q);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
void transCloseServer(void* arg) { void transCloseServer(void* arg) {
...@@ -813,8 +900,21 @@ void transUnrefSrvHandle(void* handle) { ...@@ -813,8 +900,21 @@ void transUnrefSrvHandle(void* handle) {
} }
void transReleaseSrvHandle(void* handle) { void transReleaseSrvHandle(void* handle) {
// do nothing currently if (handle == NULL) {
// return;
}
SSrvConn* pConn = handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
STransMsg tmsg = {.handle = handle, .code = 0};
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
srvMsg->pConn = pConn;
tTrace("server conn %p start to release", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
void transSendResponse(const STransMsg* pMsg) { void transSendResponse(const STransMsg* pMsg) {
if (pMsg->handle == NULL) { if (pMsg->handle == NULL) {
...@@ -826,6 +926,21 @@ void transSendResponse(const STransMsg* pMsg) { ...@@ -826,6 +926,21 @@ void transSendResponse(const STransMsg* pMsg) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn; srvMsg->pConn = pConn;
srvMsg->msg = *pMsg; srvMsg->msg = *pMsg;
srvMsg->type = Normal;
tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
void transRegisterMsg(const STransMsg* msg) {
if (msg->handle == NULL) {
return;
}
SSrvConn* pConn = msg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn;
srvMsg->msg = *msg;
srvMsg->type = Register;
tTrace("server conn %p start to send resp", pConn); tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
......
...@@ -31,11 +31,6 @@ class Server; ...@@ -31,11 +31,6 @@ class Server;
int port = 7000; int port = 7000;
// server process // server process
static bool cliPersistHandle(void *parent, tmsg_t msgType) {
// client persist handle
return msgType == 2 || msgType == 4;
}
typedef struct CbArgs { typedef struct CbArgs {
tmsg_t msgType; tmsg_t msgType;
} CbArgs; } CbArgs;
...@@ -91,21 +86,8 @@ class Client { ...@@ -91,21 +86,8 @@ class Client {
rpcClose(this->transCli); rpcClose(this->transCli);
this->transCli = NULL; this->transCli = NULL;
} }
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli);
rpcInit_.pfp = pfp;
this->transCli = rpcOpen(&rpcInit_);
}
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli); rpcClose(this->transCli);
rpcInit_.mfp = mfp;
this->transCli = rpcOpen(&rpcInit_);
}
void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli);
rpcInit_.pfp = pfp;
rpcInit_.mfp = mfp;
this->transCli = rpcOpen(&rpcInit_); this->transCli = rpcOpen(&rpcInit_);
} }
...@@ -149,7 +131,6 @@ class Server { ...@@ -149,7 +131,6 @@ class Server {
rpcInit_.label = (char *)label; rpcInit_.label = (char *)label;
rpcInit_.numOfThreads = 5; rpcInit_.numOfThreads = 5;
rpcInit_.cfp = processReq; rpcInit_.cfp = processReq;
rpcInit_.efp = NULL;
rpcInit_.user = (char *)user; rpcInit_.user = (char *)user;
rpcInit_.secret = (char *)secret; rpcInit_.secret = (char *)secret;
rpcInit_.ckey = (char *)ckey; rpcInit_.ckey = (char *)ckey;
...@@ -165,11 +146,6 @@ class Server { ...@@ -165,11 +146,6 @@ class Server {
rpcClose(this->transSrv); rpcClose(this->transSrv);
this->transSrv = NULL; this->transSrv = NULL;
} }
void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
this->Stop();
rpcInit_.efp = efp;
this->Start();
}
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
this->Stop(); this->Stop();
rpcInit_.cfp = cfp; rpcInit_.cfp = cfp;
...@@ -262,23 +238,11 @@ class TransObj { ...@@ -262,23 +238,11 @@ class TransObj {
// //
srv->Stop(); srv->Stop();
} }
void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->SetPersistFP(pfp);
}
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
// do nothing // do nothing
cli->SetConstructFP(mfp); cli->SetConstructFP(mfp);
} }
void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->SetPAndMFp(pfp, mfp);
}
// call when link broken, and notify query or fetch stop // call when link broken, and notify query or fetch stop
void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
////////
srv->SetExceptFp(efp);
}
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
/////// ///////
srv->SetSrvContinueSend(cfp); srv->SetSrvContinueSend(cfp);
...@@ -358,10 +322,10 @@ TEST_F(TransEnv, clientUserDefined) { ...@@ -358,10 +322,10 @@ TEST_F(TransEnv, clientUserDefined) {
} }
TEST_F(TransEnv, cliPersistHandle) { TEST_F(TransEnv, cliPersistHandle) {
tr->SetCliPersistFp(cliPersistHandle); // tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle, .noResp = 0}; SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -378,31 +342,22 @@ TEST_F(TransEnv, cliPersistHandle) { ...@@ -378,31 +342,22 @@ TEST_F(TransEnv, cliPersistHandle) {
} }
TEST_F(TransEnv, cliReleaseHandle) { TEST_F(TransEnv, cliReleaseHandle) {
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle}; SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
tr->cliSendAndRecvNoHandle(&req, &resp); tr->cliSendAndRecvNoHandle(&req, &resp);
// if (i == 5) {
// std::cout << "stop server" << std::endl;
// tr->StopSrv();
//}
// if (i >= 6) {
EXPECT_TRUE(resp.code == 0); EXPECT_TRUE(resp.code == 0);
//} //}
} }
////////////////// //////////////////
} }
TEST_F(TransEnv, cliReleaseHandleExcept) { TEST_F(TransEnv, cliReleaseHandleExcept) {
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle}; SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -431,7 +386,7 @@ TEST_F(TransEnv, srvContinueSend) { ...@@ -431,7 +386,7 @@ TEST_F(TransEnv, srvContinueSend) {
TEST_F(TransEnv, srvPersistHandleExcept) { TEST_F(TransEnv, srvPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
tr->SetCliPersistFp(cliPersistHandle); // tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle}; SRpcMsg req = {.handle = resp.handle};
...@@ -450,7 +405,6 @@ TEST_F(TransEnv, srvPersistHandleExcept) { ...@@ -450,7 +405,6 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
} }
TEST_F(TransEnv, cliPersistHandleExcept) { TEST_F(TransEnv, cliPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle}; SRpcMsg req = {.handle = resp.handle};
...@@ -472,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { ...@@ -472,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken // conn broken
} }
TEST_F(TransEnv, queryExcept) { TEST_F(TransEnv, queryExcept) {
tr->SetSrvExceptFp(handleExcept); // tr->SetSrvExceptFp(handleExcept);
// query and conn is broken // query and conn is broken
} }
......
...@@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) { ...@@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) {
assert(result.size() == vals.size()); assert(result.size() == vals.size());
} }
class TransCtxEnv : public ::testing::Test {
protected:
virtual void SetUp() {
ctx = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(ctx);
// TODO
}
virtual void TearDown() {
transCtxDestroy(ctx);
// formate
}
STransCtx *ctx;
};
TEST_F(TransCtxEnv, mergeTest) {
int key = 1;
{
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(src);
{
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
{
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
transCtxMerge(ctx, src);
free(src);
}
EXPECT_EQ(2, taosHashGetSize(ctx->args));
{
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(src);
{
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
{
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
transCtxMerge(ctx, src);
free(src);
}
std::string val("Hello");
EXPECT_EQ(4, taosHashGetSize(ctx->args));
{
key = 1;
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(src);
{
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
val1.val = calloc(1, 11);
memcpy(val1.val, val.c_str(), val.size());
val1.len = 11;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
{
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
val1.val = calloc(1, 11);
memcpy(val1.val, val.c_str(), val.size());
val1.len = 11;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
transCtxMerge(ctx, src);
free(src);
}
EXPECT_EQ(4, taosHashGetSize(ctx->args));
char *skey = (char *)transCtxDumpVal(ctx, 1);
EXPECT_EQ(0, strcmp(skey, val.c_str()));
free(skey);
skey = (char *)transCtxDumpVal(ctx, 2);
EXPECT_EQ(0, strcmp(skey, val.c_str()));
}
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册