未验证 提交 71853a64 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10226 from taosdata/feature/trans_impl

fix push crashed
...@@ -151,7 +151,8 @@ typedef struct { ...@@ -151,7 +151,8 @@ typedef struct {
char version : 4; // RPC version char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4 char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits char resflag : 2; // reserved bits
char spi : 3; // security parameter index char spi : 1; // security parameter index
char secured : 2;
char encrypt : 3; // encrypt algorithm, 0: no encryption char encrypt : 3; // encrypt algorithm, 0: no encryption
uint32_t code; // del later uint32_t code; // del later
...@@ -170,6 +171,10 @@ typedef struct { ...@@ -170,6 +171,10 @@ typedef struct {
uint8_t auth[TSDB_AUTH_LEN]; uint8_t auth[TSDB_AUTH_LEN];
} STransDigestMsg; } STransDigestMsg;
typedef struct {
uint8_t user[TSDB_UNI_LEN];
} STransUserMsg;
#pragma pack(pop) #pragma pack(pop)
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
...@@ -236,4 +241,8 @@ int transClearBuffer(SConnBuffer* buf); ...@@ -236,4 +241,8 @@ int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool );
#endif #endif
...@@ -36,6 +36,9 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -36,6 +36,9 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
pRpc->parent = pInit->parent; pRpc->parent = pInit->parent;
if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
}
return pRpc; return pRpc;
} }
......
...@@ -154,7 +154,7 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -154,7 +154,7 @@ static void clientHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->push != NULL) { if (conn->push == NULL) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
} }
...@@ -382,10 +382,32 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -382,10 +382,32 @@ static void clientWriteCb(uv_write_t* req, int status) {
static void clientWrite(SCliConn* pConn) { static void clientWrite(SCliConn* pConn) {
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); STransConnCtx* pCtx = pCliMsg->ctx;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen);
if (!pConn->secured) {
char* buf = calloc(1, msgLen + sizeof(STransUserMsg));
memcpy(buf, (char*)pHead, msgLen);
STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
memcpy(uMsg->user, pCtx->pTransInst->user, tListLen(uMsg->user));
int msgLen = transMsgLenFromCont(pMsg->contLen); // to avoid mem leak
destroyUserdata(pMsg);
pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);
pConn->secured = 1; // del later
pHead = (STransMsgHead*)buf;
pHead->secured = 0;
msgLen += sizeof(STransUserMsg);
}
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
......
...@@ -211,7 +211,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { ...@@ -211,7 +211,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
/* /*
* formate of data buffer: * formate of data buffer:
* |<--------------------------data from socket------------------------------->| * |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->| * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
* info--->|
*/ */
static const int CAPACITY = 1024; static const int CAPACITY = 1024;
...@@ -239,6 +240,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { ...@@ -239,6 +240,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
} }
return 0; return 0;
} }
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {}
int transUnpackMsg(STransMsgHead* msgHead) {}
int transDestroyBuffer(SConnBuffer* buf) { int transDestroyBuffer(SConnBuffer* buf) {
if (buf->cap > 0) { if (buf->cap > 0) {
tfree(buf->buf); tfree(buf->buf);
......
...@@ -231,6 +231,10 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -231,6 +231,10 @@ static void uvHandleReq(SSrvConn* pConn) {
p->chandle = NULL; p->chandle = NULL;
STransMsgHead* pHead = (STransMsgHead*)p->msg; STransMsgHead* pHead = (STransMsgHead*)p->msg;
if (pHead->secured == 0) {
STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
}
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
assert(transIsReq(pHead->msgType)); assert(transIsReq(pHead->msgType));
...@@ -339,6 +343,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -339,6 +343,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->msgType = smsg->pConn->inType + 1; pHead->msgType = smsg->pConn->inType + 1;
pHead->code = htonl(pMsg->code);
// add more info // add more info
char* msg = (char*)pHead; char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
......
...@@ -3,12 +3,12 @@ add_executable(client "") ...@@ -3,12 +3,12 @@ add_executable(client "")
add_executable(server "") add_executable(server "")
add_executable(transUT "") add_executable(transUT "")
add_executable(syncClient "") add_executable(syncClient "")
add_executable(pushClient "")
target_sources(transUT target_sources(transUT
PRIVATE PRIVATE
"transUT.cc" "transUT.cc"
) )
target_sources(transportTest target_sources(transportTest
PRIVATE PRIVATE
"transportTests.cc" "transportTests.cc"
...@@ -26,6 +26,11 @@ target_sources (syncClient ...@@ -26,6 +26,11 @@ target_sources (syncClient
"syncClient.c" "syncClient.c"
) )
target_sources(pushClient
PRIVATE
"pushClient.c"
)
target_include_directories(transportTest target_include_directories(transportTest
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport" "${CMAKE_SOURCE_DIR}/include/libs/transport"
...@@ -92,4 +97,16 @@ target_link_libraries (syncClient ...@@ -92,4 +97,16 @@ target_link_libraries (syncClient
transport transport
) )
target_include_directories(pushClient
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (pushClient
os
util
common
gtest_main
transport
)
...@@ -49,9 +49,8 @@ static int tcount = 0; ...@@ -49,9 +49,8 @@ static int tcount = 0;
typedef struct SPushArg { typedef struct SPushArg {
tsem_t sem; tsem_t sem;
} SPushArg; } SPushArg;
// ping
int pushCallback(void *arg, SRpcMsg *msg) { int pushCallback(void *arg, SRpcMsg *msg) {
SPushArg *push = arg; SPushArg *push = arg;
tsem_post(&push->sem); tsem_post(&push->sem);
...@@ -59,7 +58,8 @@ int pushCallback(void *arg, SRpcMsg *msg) { ...@@ -59,7 +58,8 @@ int pushCallback(void *arg, SRpcMsg *msg) {
SRpcPush *createPushArg() { SRpcPush *createPushArg() {
SRpcPush *push = calloc(1, sizeof(SRpcPush)); SRpcPush *push = calloc(1, sizeof(SRpcPush));
push->arg = calloc(1, sizeof(SPushArg)); push->arg = calloc(1, sizeof(SPushArg));
tsem_init(&push->arg->sem, 0, 0);
tsem_init(&(((SPushArg *)push->arg)->sem), 0, 0);
push->callback = pushCallback; push->callback = pushCallback;
return push; return push;
} }
...@@ -83,14 +83,17 @@ static void *sendRequest(void *param) { ...@@ -83,14 +83,17 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo; rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
rpcMsg.push = push; rpcMsg.push = push;
;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs(); int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); // ping->pong
// tsem_wait(&pInfo->rspSem); // tsem_wait(&pInfo->rspSem);
tsem_wait(&push->sem); SPushArg *arg = push->arg;
/// e
tsem_wait(&arg->sem); // push callback
// query_fetch(client->h)
int64_t end = taosGetTimestampUs() - start; int64_t end = taosGetTimestampUs() - start;
if (end <= 100) { if (end <= 100) {
u100++; u100++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册