提交 ec4ffd65 编写于 作者: S Shuduo Sang

[TD-3113] <fix>: remove curl from TDengine. works now.

上级 dec80349
...@@ -1658,73 +1658,97 @@ int curlProceSql(char* host, uint16_t port, char* sqlstr, CURL *curl_handle) ...@@ -1658,73 +1658,97 @@ int curlProceSql(char* host, uint16_t port, char* sqlstr, CURL *curl_handle)
} }
#endif #endif
#define REQ_BUF_LEN 1024000 #define REQ_EXTRA_BUF_LEN 1024
#define REP_BUF_LEN 4096000 #define RESP_BUF_LEN 4096
void ERROR(const char *msg) { perror(msg); exit(0); } void ERROR(const char *msg) { perror(msg); exit(0); }
int postProceSql(char* host, uint16_t port, char* sqlstr) int postProceSql(char* host, uint16_t port, char* sqlstr)
{ {
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nUser-Agent: postress/0.1\r\nAccept: */*\r\n%s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\n%s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
char *url = "/rest/sql"; char *url = "/rest/sql";
char *auth = "Authorization: Basic cm9vdDp0YW9zZGF0YQ=="; char *auth = "Authorization: Basic cm9vdDp0YW9zZGF0YQ==";
struct hostent *server; struct hostent *server;
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
int sockfd, bytes, sent, received, total; int sockfd, bytes, sent, received, req_str_len, resp_len;
char request[REQ_BUF_LEN], response[REP_BUF_LEN]; char *request_buf;
char response_buf[RESP_BUF_LEN];
uint16_t rest_port = port + TSDB_PORT_HTTP;
int r = snprintf(request, int req_buf_len = strlen(sqlstr) + REQ_EXTRA_BUF_LEN;
REQ_BUF_LEN,
req_fmt, url, host, port + TSDB_PORT_HTTP, request_buf = malloc(req_buf_len);
if (NULL == request_buf)
ERROR("ERROR, cannot allocate memory.");
int r = snprintf(request_buf,
req_buf_len,
req_fmt, url, host, rest_port,
auth, strlen(sqlstr), sqlstr); auth, strlen(sqlstr), sqlstr);
if (r >= REQ_BUF_LEN) if (r >= req_buf_len) {
free(request_buf);
ERROR("ERROR too long request"); ERROR("ERROR too long request");
printf("Request:\n%s\n", request); }
printf("Request:\n%s\n", request_buf);
sockfd = socket(AF_INET, SOCK_STREAM, 0); sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) ERROR("ERROR opening socket"); if (sockfd < 0) {
free(request_buf);
ERROR("ERROR opening socket");
}
server = gethostbyname(host); server = gethostbyname(host);
if (server == NULL) ERROR("ERROR, no such host"); if (server == NULL) {
free(request_buf);
ERROR("ERROR, no such host");
}
memset(&serv_addr,0,sizeof(serv_addr)); memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port); serv_addr.sin_port = htons(rest_port);
memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length); memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) {
free(request_buf);
ERROR("ERROR connecting"); ERROR("ERROR connecting");
}
total = strlen(request); req_str_len = strlen(request_buf);
sent = 0; sent = 0;
do { do {
bytes = write(sockfd, request + sent, total - sent); bytes = write(sockfd, request_buf + sent, req_str_len - sent);
if (bytes < 0) if (bytes < 0)
ERROR("ERROR writing message to socket"); ERROR("ERROR writing message to socket");
if (bytes == 0) if (bytes == 0)
break; break;
sent+=bytes; sent+=bytes;
} while (sent < total); } while (sent < req_str_len);
memset(response, 0, sizeof(response)); memset(response_buf, 0, RESP_BUF_LEN);
total = sizeof(response)-1; resp_len = sizeof(response_buf) - 1;
received = 0; received = 0;
do { do {
bytes = read(sockfd, response + received, total - received); bytes = read(sockfd, response_buf + received, resp_len - received);
if (bytes < 0) if (bytes < 0) {
free(request_buf);
ERROR("ERROR reading response from socket"); ERROR("ERROR reading response from socket");
}
if (bytes == 0) if (bytes == 0)
break; break;
received+=bytes; received += bytes;
} while (received < total); } while (received < resp_len);
if (received == total) if (received == resp_len) {
free(request_buf);
ERROR("ERROR storing complete response from socket"); ERROR("ERROR storing complete response from socket");
}
printf("Response:\n%s\n", response_buf);
free(request_buf);
close(sockfd); close(sockfd);
printf("Response:\n%s\n",response);
return 0; return 0;
} }
...@@ -3801,7 +3825,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa ...@@ -3801,7 +3825,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
//printf("http insert sql return, Spent %ld ms \n", t2 - t1); //printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if (0 != retCode) { if (0 != retCode) {
printf("========curl return fail, threadID[%d]\n", winfo->threadID); printf("========restful return fail, threadID[%d]\n", winfo->threadID);
goto free_and_statistics; goto free_and_statistics;
} }
} }
...@@ -4030,7 +4054,7 @@ void *syncWrite(void *sarg) { ...@@ -4030,7 +4054,7 @@ void *syncWrite(void *sarg) {
//printf("http insert sql return, Spent %ld ms \n", t2 - t1); //printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if (0 != retCode) { if (0 != retCode) {
printf("========curl return fail, threadID[%d]\n", winfo->threadID); printf("========restful return fail, threadID[%d]\n", winfo->threadID);
goto free_and_statistics_2; goto free_and_statistics_2;
} }
} }
...@@ -4242,9 +4266,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -4242,9 +4266,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
} }
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
#ifdef TD_LOWA_CURL
t_info->curl_handle = curl_easy_init();
#endif
} }
if (0 == superTblInfo->multiThreadWriteOneTbl) { if (0 == superTblInfo->multiThreadWriteOneTbl) {
...@@ -4289,11 +4310,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -4289,11 +4310,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
cntDelay += t_info->cntDelay; cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
#ifdef TD_LOWA_CURL
if (t_info->curl_handle) {
curl_easy_cleanup(t_info->curl_handle);
}
#endif
} }
cntDelay -= 1; cntDelay -= 1;
...@@ -4578,17 +4594,15 @@ void *superQueryProcess(void *sarg) { ...@@ -4578,17 +4594,15 @@ void *superQueryProcess(void *sarg) {
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else { } else {
#ifdef TD_LOWA_CURL
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
if (0 != retCode) { if (0 != retCode) {
printf("====curl return fail, threadID[%d]\n", winfo->threadID); printf("====restful return fail, threadID[%d]\n", winfo->threadID);
return NULL; return NULL;
} }
#endif
} }
} }
et = taosGetTimestampMs(); et = taosGetTimestampMs();
...@@ -4692,9 +4706,6 @@ int queryTestProcess() { ...@@ -4692,9 +4706,6 @@ int queryTestProcess() {
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
#ifdef TD_LOWA_CURL
t_info->curl_handle = curl_easy_init();
#endif
} }
pthread_create(pids + i, NULL, superQueryProcess, t_info); pthread_create(pids + i, NULL, superQueryProcess, t_info);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册