提交 9322928a 编写于 作者: dengyihao's avatar dengyihao

opt http module

上级 4fdc98d3
......@@ -45,6 +45,8 @@ typedef struct SHttpMsg {
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static SHttpModule* http = NULL;
static void httpHandleReq(SHttpMsg* msg);
static void* httpThread(void* arg) {
SHttpModule* http = (SHttpModule*)arg;
setThreadName("http-cli-send-thread");
......@@ -52,6 +54,13 @@ static void* httpThread(void* arg) {
return NULL;
}
static void httpDestroyMsg(SHttpMsg* msg) {
if (msg == NULL) return;
taosMemoryFree(msg->server);
taosMemoryFree(msg->cont);
taosMemoryFree(msg);
}
static void httpAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data;
SHttpModule* http = item->pThrd;
......@@ -68,6 +77,7 @@ static void httpAsyncCb(uv_async_t* handle) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
msg = QUEUE_DATA(h, SHttpMsg, q);
httpHandleReq(msg);
}
}
......@@ -355,7 +365,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
}
int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
msg->server = strdup(server);
msg->port = port;
msg->cont = taosMemoryMalloc(contLen);
......@@ -365,3 +374,57 @@ int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, i
transAsyncSend(http->asyncPool, &(msg->q));
return 0;
}
static void httpHandleReq(SHttpMsg* msg) {
struct sockaddr_in dest = {0};
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
httpDestroyMsg(msg);
return;
}
if (msg->flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
if (dstLen > 0) {
msg->len = dstLen;
} else {
msg->flag = HTTP_FLAT;
}
}
terrno = 0;
int32_t len = 2048;
char* header = taosMemoryCalloc(1, len);
int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag);
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
cli->conn.data = cli;
cli->tcp.data = cli;
cli->req.data = cli;
cli->wbuf = wb;
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
cli->addr = msg->server;
cli->port = msg->port;
taosMemoryFree(msg);
uv_tcp_init(http->loop, &cli->tcp);
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5);
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (ret != 0) {
uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
destroyHttpClient(cli);
return;
}
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb);
if (ret != 0) {
uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
cli->port);
destroyHttpClient(cli);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册