未验证 提交 2eb40c24 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10844 from taosdata/feature/supportQuery

Feature/support query
...@@ -138,11 +138,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -138,11 +138,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
conn->status = ConnRelease; \ conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request", conn); \
if (T_REF_VAL_GET(conn) == 1) { \ if (T_REF_VAL_GET(conn) == 1) { \
SCliThrdObj* thrd = conn->hostThrd; \ SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \ addConnToPool(thrd->pool, conn); \
} \ } \
goto _RETURN; \ return; \
} \ } \
} while (0) } while (0)
...@@ -150,7 +151,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -150,7 +151,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
cliHandleExcept(conn); \ cliHandleExcept(conn); \
goto _RETURE; \ return; \
} \ } \
} while (0) } while (0)
...@@ -158,9 +159,9 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -158,9 +159,9 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
do { \ do { \
if (conn->broken) { \ if (conn->broken) { \
cliHandleExcept(conn); \ cliHandleExcept(conn); \
goto _RETURE; \ return; \
} \ } \
} while (0); } while (0)
#define CONN_SET_PERSIST_BY_APP(conn) \ #define CONN_SET_PERSIST_BY_APP(conn) \
do { \ do { \
...@@ -388,13 +389,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -388,13 +389,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->status = ConnNormal; conn->status = ConnNormal;
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
taosArrayClear(conn->cliMsgs);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
assert(!QUEUE_IS_EMPTY(&plist->conn));
} }
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
// avoid conn
QUEUE_REMOVE(&conn->conn);
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
...@@ -420,6 +421,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { ...@@ -420,6 +421,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
// read(2). // read(2).
tTrace("%s cli conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
return; return;
} }
if (nread < 0) { if (nread < 0) {
...@@ -555,8 +557,6 @@ void cliSend(SCliConn* pConn) { ...@@ -555,8 +557,6 @@ void cliSend(SCliConn* pConn) {
pConn->writeReq.data = pConn; pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
_RETURE:
return; return;
} }
...@@ -594,6 +594,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -594,6 +594,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
transUnrefCliHandle(conn);
taosArrayPush(conn->cliMsgs, &pMsg); taosArrayPush(conn->cliMsgs, &pMsg);
if (taosArrayGetSize(conn->cliMsgs) >= 2) { if (taosArrayGetSize(conn->cliMsgs) >= 2) {
return; // send one by one return; // send one by one
...@@ -613,6 +614,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -613,6 +614,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} else {
tTrace("not found conn in conn pool %p", pThrd->pool);
} }
} }
return conn; return conn;
......
...@@ -99,7 +99,19 @@ static const char* notify = "a"; ...@@ -99,7 +99,19 @@ static const char* notify = "a";
conn->status = ConnRelease; \ conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
goto _RETURE; \ tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.handle = (void*)conn, .code = 0}; \
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
taosArrayPush(conn->srvMsgs, &srvMsg); \
if (taosArrayGetSize(conn->srvMsgs) > 1) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \ } \
} while (0) } while (0)
// refactor later // refactor later
...@@ -242,6 +254,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -242,6 +254,7 @@ static void uvHandleReq(SSrvConn* pConn) {
pHead->msgLen -= sizeof(STransUserMsg); pHead->msgLen -= sizeof(STransUserMsg);
} }
} }
CONN_SHOULD_RELEASE(pConn, pHead); CONN_SHOULD_RELEASE(pConn, pHead);
STransMsg transMsg; STransMsg transMsg;
...@@ -280,8 +293,6 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -280,8 +293,6 @@ static void uvHandleReq(SSrvConn* pConn) {
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
_RETURE:
return;
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
...@@ -798,11 +809,10 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { ...@@ -798,11 +809,10 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
// release handle to rpc init // release handle to rpc init
SSrvConn* conn = msg->pConn; SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) { taosArrayPush(conn->srvMsgs, &msg);
taosArrayPush(conn->srvMsgs, &msg); if (taosArrayGetSize(conn->srvMsgs) > 1) {
return; return;
} }
taosArrayPush(conn->srvMsgs, &msg);
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
return; return;
} else if (conn->status == ConnRelease) { } else if (conn->status == ConnRelease) {
......
...@@ -30,24 +30,8 @@ const char *ckey = "ckey"; ...@@ -30,24 +30,8 @@ const char *ckey = "ckey";
class Server; class Server;
int port = 7000; int port = 7000;
// server process // server process
typedef struct CbArgs {
tmsg_t msgType;
} CbArgs;
static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
if (msgType == 1 || msgType == 2) {
CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs));
args->msgType = msgType;
return args;
}
return NULL;
}
// server except // server except
static bool handleExcept(void *parent, tmsg_t msgType) {
//
return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
}
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
...@@ -86,10 +70,6 @@ class Client { ...@@ -86,10 +70,6 @@ class Client {
rpcClose(this->transCli); rpcClose(this->transCli);
this->transCli = NULL; this->transCli = NULL;
} }
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli);
this->transCli = rpcOpen(&rpcInit_);
}
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
SEpSet epSet = {0}; SEpSet epSet = {0};
...@@ -108,7 +88,6 @@ class Client { ...@@ -108,7 +88,6 @@ class Client {
SendAndRecv(req, resp); SendAndRecv(req, resp);
} }
void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {}
void SemWait() { tsem_wait(&this->sem); } void SemWait() { tsem_wait(&this->sem); }
void SemPost() { tsem_post(&this->sem); } void SemPost() { tsem_post(&this->sem); }
void Reset() {} void Reset() {}
...@@ -141,12 +120,17 @@ class Server { ...@@ -141,12 +120,17 @@ class Server {
this->transSrv = rpcOpen(&this->rpcInit_); this->transSrv = rpcOpen(&this->rpcInit_);
taosMsleep(1000); taosMsleep(1000);
} }
void SetSrvContinueSend(CB cb) {
this->Stop();
rpcInit_.cfp = cb;
this->Start();
}
void Stop() { void Stop() {
if (this->transSrv == NULL) return; if (this->transSrv == NULL) return;
rpcClose(this->transSrv); rpcClose(this->transSrv);
this->transSrv = NULL; this->transSrv = NULL;
} }
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
this->Stop(); this->Stop();
rpcInit_.cfp = cfp; rpcInit_.cfp = cfp;
this->Start(); this->Start();
...@@ -174,9 +158,6 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -174,9 +158,6 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
} }
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
for (int i = 0; i < 9; i++) {
rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER);
}
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100); rpcMsg.pCont = rpcMallocCont(100);
...@@ -238,10 +219,6 @@ class TransObj { ...@@ -238,10 +219,6 @@ class TransObj {
// //
srv->Stop(); srv->Stop();
} }
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->SetConstructFP(mfp);
}
// call when link broken, and notify query or fetch stop // call when link broken, and notify query or fetch stop
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
/////// ///////
...@@ -279,7 +256,7 @@ class TransEnv : public ::testing::Test { ...@@ -279,7 +256,7 @@ class TransEnv : public ::testing::Test {
}; };
TEST_F(TransEnv, 01sendAndRec) { TEST_F(TransEnv, 01sendAndRec) {
for (int i = 0; i < 1; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}, resp = {0}; SRpcMsg req = {0}, resp = {0};
req.msgType = 0; req.msgType = 0;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
...@@ -322,22 +299,33 @@ TEST_F(TransEnv, clientUserDefined) { ...@@ -322,22 +299,33 @@ TEST_F(TransEnv, clientUserDefined) {
} }
TEST_F(TransEnv, cliPersistHandle) { TEST_F(TransEnv, cliPersistHandle) {
// tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
void * handle = NULL;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
tr->cliSendAndRecv(&req, &resp); tr->cliSendAndRecv(&req, &resp);
if (i == 5) { // if (i == 5) {
std::cout << "stop server" << std::endl; // std::cout << "stop server" << std::endl;
tr->StopSrv(); // tr->StopSrv();
} //}
if (i >= 6) { // if (i >= 6) {
EXPECT_TRUE(resp.code != 0); // EXPECT_TRUE(resp.code != 0);
} //}
handle = resp.handle;
} }
rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
for (int i = 0; i < 10; i++) {
SRpcMsg req = {0};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
}
taosMsleep(1000);
////////////////// //////////////////
} }
...@@ -425,11 +413,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) { ...@@ -425,11 +413,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) {
TEST_F(TransEnv, multiCliPersistHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken // conn broken
} }
TEST_F(TransEnv, queryExcept) { TEST_F(TransEnv, queryExcept) {}
// tr->SetSrvExceptFp(handleExcept);
// query and conn is broken
}
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册