提交 2d0c5ff5 编写于 作者: dengyihao's avatar dengyihao

handle except

上级 9d12273a
......@@ -138,11 +138,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request", conn); \
if (T_REF_VAL_GET(conn) == 1) { \
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
goto _RETURN; \
return; \
} \
} while (0)
......@@ -150,7 +151,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
do { \
if (thrd->quit) { \
cliHandleExcept(conn); \
goto _RETURE; \
return; \
} \
} while (0)
......@@ -158,9 +159,9 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
do { \
if (conn->broken) { \
cliHandleExcept(conn); \
goto _RETURE; \
return; \
} \
} while (0);
} while (0)
#define CONN_SET_PERSIST_BY_APP(conn) \
do { \
......@@ -389,12 +390,11 @@ static void addConnToPool(void* pool, SCliConn* conn) {
// list already create before
assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn);
assert(!QUEUE_IS_EMPTY(&plist->conn));
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
// avoid conn
QUEUE_REMOVE(&conn->conn);
transAllocBuffer(pBuf, buf);
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
......@@ -420,6 +420,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
// read(2).
tTrace("%s cli conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (nread < 0) {
......@@ -555,8 +556,6 @@ void cliSend(SCliConn* pConn) {
pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
_RETURE:
return;
}
......@@ -594,6 +593,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
transUnrefCliHandle(conn);
taosArrayPush(conn->cliMsgs, &pMsg);
if (taosArrayGetSize(conn->cliMsgs) >= 2) {
return; // send one by one
......@@ -613,6 +613,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) {
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} else {
tTrace("not found conn in conn pool %p", pThrd->pool);
}
}
return conn;
......
......@@ -99,7 +99,19 @@ static const char* notify = "a";
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
goto _RETURE; \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.handle = (void*)conn, .code = 0}; \
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
taosArrayPush(conn->srvMsgs, &srvMsg); \
if (taosArrayGetSize(conn->srvMsgs) > 1) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
// refactor later
......@@ -242,6 +254,7 @@ static void uvHandleReq(SSrvConn* pConn) {
pHead->msgLen -= sizeof(STransUserMsg);
}
}
CONN_SHOULD_RELEASE(pConn, pHead);
STransMsg transMsg;
......@@ -280,8 +293,6 @@ static void uvHandleReq(SSrvConn* pConn) {
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
_RETURE:
return;
}
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
......@@ -798,11 +809,10 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
// release handle to rpc init
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg);
taosArrayPush(conn->srvMsgs, &msg);
if (taosArrayGetSize(conn->srvMsgs) > 1) {
return;
}
taosArrayPush(conn->srvMsgs, &msg);
uvStartSendRespInternal(msg);
return;
} else if (conn->status == ConnRelease) {
......
......@@ -30,24 +30,8 @@ const char *ckey = "ckey";
class Server;
int port = 7000;
// server process
typedef struct CbArgs {
tmsg_t msgType;
} CbArgs;
static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
if (msgType == 1 || msgType == 2) {
CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs));
args->msgType = msgType;
return args;
}
return NULL;
}
// server except
static bool handleExcept(void *parent, tmsg_t msgType) {
//
return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
}
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
......@@ -86,10 +70,6 @@ class Client {
rpcClose(this->transCli);
this->transCli = NULL;
}
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli);
this->transCli = rpcOpen(&rpcInit_);
}
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
SEpSet epSet = {0};
......@@ -108,7 +88,6 @@ class Client {
SendAndRecv(req, resp);
}
void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {}
void SemWait() { tsem_wait(&this->sem); }
void SemPost() { tsem_post(&this->sem); }
void Reset() {}
......@@ -141,12 +120,17 @@ class Server {
this->transSrv = rpcOpen(&this->rpcInit_);
taosMsleep(1000);
}
void SetSrvContinueSend(CB cb) {
this->Stop();
rpcInit_.cfp = cb;
this->Start();
}
void Stop() {
if (this->transSrv == NULL) return;
rpcClose(this->transSrv);
this->transSrv = NULL;
}
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
this->Stop();
rpcInit_.cfp = cfp;
this->Start();
......@@ -174,9 +158,6 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
for (int i = 0; i < 9; i++) {
rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER);
}
for (int i = 0; i < 10; i++) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100);
......@@ -238,10 +219,6 @@ class TransObj {
//
srv->Stop();
}
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->SetConstructFP(mfp);
}
// call when link broken, and notify query or fetch stop
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
///////
......@@ -279,7 +256,7 @@ class TransEnv : public ::testing::Test {
};
TEST_F(TransEnv, 01sendAndRec) {
for (int i = 0; i < 1; i++) {
for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}, resp = {0};
req.msgType = 0;
req.pCont = rpcMallocCont(10);
......@@ -322,22 +299,33 @@ TEST_F(TransEnv, clientUserDefined) {
}
TEST_F(TransEnv, cliPersistHandle) {
// tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
void * handle = NULL;
for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
if (i == 5) {
std::cout << "stop server" << std::endl;
tr->StopSrv();
}
if (i >= 6) {
EXPECT_TRUE(resp.code != 0);
}
// if (i == 5) {
// std::cout << "stop server" << std::endl;
// tr->StopSrv();
//}
// if (i >= 6) {
// EXPECT_TRUE(resp.code != 0);
//}
handle = resp.handle;
}
rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
for (int i = 0; i < 10; i++) {
SRpcMsg req = {0};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
}
taosMsleep(1000);
//////////////////
}
......@@ -425,11 +413,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) {
TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken
}
TEST_F(TransEnv, queryExcept) {
// tr->SetSrvExceptFp(handleExcept);
// query and conn is broken
}
TEST_F(TransEnv, queryExcept) {}
TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册