未验证 提交 b64d3555 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10182 from taosdata/feature/trans_impl

fix crash
......@@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
pRpc->parent = pInit->parent;
return pRpc;
}
......
......@@ -124,10 +124,10 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
if (pCtx->pSem == NULL) {
tDebug("conn %p handle resp", conn);
(pRpc->cfp)(NULL, &rpcMsg, NULL);
tDebug("client conn %p handle resp, ", conn);
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
} else {
tDebug("conn %p handle resp", conn);
tDebug("client conn(sync) %p handle resp", conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem);
}
......@@ -154,7 +154,7 @@ static void clientHandleExcept(SCliConn* pConn) {
clientConnDestroy(pConn, true);
return;
}
tDebug("conn %p start to destroy", pConn);
tDebug("client conn %p start to destroy", pConn);
SCliMsg* pMsg = pConn->data;
destroyUserdata(&pMsg->msg);
......@@ -166,7 +166,7 @@ static void clientHandleExcept(SCliConn* pConn) {
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
if (pCtx->pSem == NULL) {
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
} else {
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
// SRpcMsg rpcMsg
......@@ -184,7 +184,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout;
tDebug("timeout, try to remove expire conn from conn pool");
tDebug("client conn timeout, try to remove expire conn from conn pool");
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) {
......@@ -253,7 +253,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
......@@ -294,10 +294,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
pBuf->len += nread;
if (clientReadComplete(pBuf)) {
uv_read_stop((uv_stream_t*)conn->stream);
tDebug("conn %p read complete", conn);
tDebug("client conn %p read complete", conn);
clientHandleResp(conn);
} else {
tDebug("conn %p read partial packet, continue to read", conn);
tDebug("client conn %p read partial packet, continue to read", conn);
}
return;
}
......@@ -309,7 +309,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
return;
}
if (nread < 0 || nread == UV_EOF) {
tError("conn %p read error: %s", conn, uv_err_name(nread));
tError("client conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn);
}
// tDebug("Read error %s\n", uv_err_name(nread));
......@@ -320,9 +320,9 @@ static void clientConnDestroy(SCliConn* conn, bool clear) {
//
conn->ref--;
if (conn->ref == 0) {
tDebug("conn %p remove from conn pool", conn);
tDebug("client conn %p remove from conn pool", conn);
QUEUE_REMOVE(&conn->conn);
tDebug("conn %p remove from conn pool successfully", conn);
tDebug("client conn %p remove from conn pool successfully", conn);
if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy);
}
......@@ -334,7 +334,7 @@ static void clientDestroy(uv_handle_t* handle) {
free(conn->stream);
free(conn->writeReq);
tDebug("conn %p destroy successfully", conn);
tDebug("client conn %p destroy successfully", conn);
free(conn);
// clientConnDestroy(conn, false);
......@@ -343,7 +343,7 @@ static void clientDestroy(uv_handle_t* handle) {
static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data;
if (status == 0) {
tDebug("conn %p data already was written out", pConn);
tDebug("client conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) {
// handle
......@@ -351,7 +351,7 @@ static void clientWriteCb(uv_write_t* req, int status) {
}
destroyUserdata(&pMsg->msg);
} else {
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
clientHandleExcept(pConn);
return;
}
......@@ -370,7 +370,7 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen);
tDebug("client conn %p data write out, msgType : %s, len: %d", pConn, TMSG_INFO(pHead->msgType), msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
static void clientConnCb(uv_connect_t* req, int status) {
......@@ -378,11 +378,11 @@ static void clientConnCb(uv_connect_t* req, int status) {
SCliConn* pConn = req->data;
if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError("conn %p failed to connect server: %s", pConn, uv_strerror(status));
tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
clientHandleExcept(pConn);
return;
}
tDebug("conn %p create", pConn);
tDebug("client conn %p create", pConn);
assert(pConn->stream == req->handle);
clientWrite(pConn);
......@@ -400,14 +400,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
tDebug("msg tran time cost: %" PRIu64 "", el);
tDebug("client msg tran time cost: %" PRIu64 "", el);
et = taosGetTimestampUs();
STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) {
// impl later
tDebug("conn %p get from conn pool", conn);
tDebug("client get conn %p from pool", conn);
conn->data = pMsg;
conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf);
......
......@@ -266,6 +266,7 @@ static void uvHandleReq(SSrvConn* pConn) {
transClearBuffer(&pConn->readBuf);
pConn->ref++;
tDebug("%s received on %p", TMSG_INFO(rpcMsg.msgType), pConn);
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
......@@ -278,7 +279,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
tDebug("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
if (readComplete(pBuf)) {
tDebug("conn %p alread read complete packet", conn);
uvHandleReq(conn);
......@@ -717,6 +718,9 @@ void taosCloseServer(void* arg) {
}
void rpcSendResponse(const SRpcMsg* pMsg) {
if (pMsg->handle == NULL) {
return;
}
SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册