提交 457c544d 编写于 作者: dengyihao's avatar dengyihao

fix push crashed

上级 66f7a305
......@@ -151,7 +151,8 @@ typedef struct {
char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits
char spi : 3; // security parameter index
char spi : 1; // security parameter index
char secured : 2;
char encrypt : 3; // encrypt algorithm, 0: no encryption
uint32_t code; // del later
......@@ -170,6 +171,10 @@ typedef struct {
uint8_t auth[TSDB_AUTH_LEN];
} STransDigestMsg;
typedef struct {
uint8_t user[TSDB_UNI_LEN];
} STransUserMsg;
#pragma pack(pop)
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
......@@ -236,4 +241,8 @@ int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool );
#endif
......@@ -36,6 +36,9 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
pRpc->parent = pInit->parent;
if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
}
return pRpc;
}
......
......@@ -154,7 +154,7 @@ static void clientHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1
if (conn->push != NULL) {
if (conn->push == NULL) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
}
......@@ -382,10 +382,32 @@ static void clientWriteCb(uv_write_t* req, int status) {
static void clientWrite(SCliConn* pConn) {
SCliMsg* pCliMsg = pConn->data;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
STransConnCtx* pCtx = pCliMsg->ctx;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen);
if (!pConn->secured) {
char* buf = calloc(1, msgLen + sizeof(STransUserMsg));
memcpy(buf, (char*)pHead, msgLen);
STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
memcpy(uMsg->user, pCtx->pTransInst->user, tListLen(uMsg->user));
int msgLen = transMsgLenFromCont(pMsg->contLen);
// to avoid mem leak
destroyUserdata(pMsg);
pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);
pConn->secured = 1; // del later
pHead = (STransMsgHead*)buf;
pHead->secured = 0;
msgLen += sizeof(STransUserMsg);
}
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
......
......@@ -211,7 +211,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
/*
* formate of data buffer:
* |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
* |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
* info--->|
*/
static const int CAPACITY = 1024;
......@@ -239,6 +240,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
}
return 0;
}
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {}
int transUnpackMsg(STransMsgHead* msgHead) {}
int transDestroyBuffer(SConnBuffer* buf) {
if (buf->cap > 0) {
tfree(buf->buf);
......
......@@ -231,6 +231,10 @@ static void uvHandleReq(SSrvConn* pConn) {
p->chandle = NULL;
STransMsgHead* pHead = (STransMsgHead*)p->msg;
if (pHead->secured == 0) {
STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
}
pConn->inType = pHead->msgType;
assert(transIsReq(pHead->msgType));
......@@ -339,6 +343,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->msgType = smsg->pConn->inType + 1;
pHead->code = htonl(pMsg->code);
// add more info
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
......
......@@ -3,12 +3,12 @@ add_executable(client "")
add_executable(server "")
add_executable(transUT "")
add_executable(syncClient "")
add_executable(pushClient "")
target_sources(transUT
PRIVATE
"transUT.cc"
)
target_sources(transportTest
PRIVATE
"transportTests.cc"
......@@ -26,6 +26,11 @@ target_sources (syncClient
"syncClient.c"
)
target_sources(pushClient
PRIVATE
"pushClient.c"
)
target_include_directories(transportTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
......@@ -92,4 +97,16 @@ target_link_libraries (syncClient
transport
)
target_include_directories(pushClient
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (pushClient
os
util
common
gtest_main
transport
)
......@@ -49,9 +49,8 @@ static int tcount = 0;
typedef struct SPushArg {
tsem_t sem;
} SPushArg;
// ping
int pushCallback(void *arg, SRpcMsg *msg) {
SPushArg *push = arg;
tsem_post(&push->sem);
......@@ -59,7 +58,8 @@ int pushCallback(void *arg, SRpcMsg *msg) {
SRpcPush *createPushArg() {
SRpcPush *push = calloc(1, sizeof(SRpcPush));
push->arg = calloc(1, sizeof(SPushArg));
tsem_init(&push->arg->sem, 0, 0);
tsem_init(&(((SPushArg *)push->arg)->sem), 0, 0);
push->callback = pushCallback;
return push;
}
......@@ -83,14 +83,17 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
rpcMsg.push = push;
;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); // ping->pong
// tsem_wait(&pInfo->rspSem);
tsem_wait(&push->sem);
SPushArg *arg = push->arg;
/// e
tsem_wait(&arg->sem); // push callback
// query_fetch(client->h)
int64_t end = taosGetTimestampUs() - start;
if (end <= 100) {
u100++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册