未验证 提交 ff5f5224 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-10578]<fix>: taosdemo rest interface slow. (#8229)

* [TD-10578]<fix>: taosdemo rest interface slow.

* fix missed 200

* fix create table count

* make socket per thread

* fix sockfd read timeout

* add socket to each query thread

* [TD-10469]<docs>: use blm3 for devops

* resolve conflict.

* rerun drone
Co-authored-by: Nzhaoyanggh <yzhao@taosdata.com>
上级 e9ec776c
......@@ -504,6 +504,7 @@ typedef struct SThreadInfo_S {
uint64_t querySeq; // sequence number of sql command
TAOS_SUB* tsub;
int sockfd;
} threadInfo;
#ifdef WINDOWS
......@@ -583,8 +584,7 @@ static void prompt();
static int createDatabasesAndStables();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange);
static bool getInfoFromJsonFile(char* file);
......@@ -2218,7 +2218,7 @@ static void selectAndGetResult(
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
g_queryInfo.host, g_queryInfo.port,
command,
pThreadInfo);
if (0 != retCode) {
......@@ -3397,7 +3397,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
free(dbInfos);
}
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
static int postProceSql(char *host, uint16_t port,
char* sqlstr, threadInfo *pThreadInfo)
{
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
......@@ -3435,29 +3435,6 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
size_t encoded_len = 4 * ((userpass_buf_len +2) / 3);
char base64_buf[INPUT_BUF_LEN];
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
free(request_buf);
ERROR_EXIT("connecting");
}
memset(base64_buf, 0, INPUT_BUF_LEN);
......@@ -3497,9 +3474,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
sent = 0;
do {
#ifdef WINDOWS
bytes = send(sockfd, request_buf + sent, req_str_len - sent, 0);
bytes = send(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent, 0);
#else
bytes = write(sockfd, request_buf + sent, req_str_len - sent);
bytes = write(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent);
#endif
if (bytes < 0)
ERROR_EXIT("writing message to socket");
......@@ -3511,12 +3488,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
memset(response_buf, 0, RESP_BUF_LEN);
resp_len = sizeof(response_buf) - 1;
received = 0;
char resEncodingChunk[] = "Encoding: chunked";
char resHttp[] = "HTTP/1.1 ";
char resHttpOk[] = "HTTP/1.1 200 OK";
do {
#ifdef WINDOWS
bytes = recv(sockfd, response_buf + received, resp_len - received, 0);
bytes = recv(pThreadInfo->sockfds, response_buf + received, resp_len - received, 0);
#else
bytes = read(sockfd, response_buf + received, resp_len - received);
bytes = read(pThreadInfo->sockfd, response_buf + received, resp_len - received);
#endif
verbosePrint("%s() LN%d: bytes:%d\n", __func__, __LINE__, bytes);
if (bytes < 0) {
free(request_buf);
ERROR_EXIT("reading response from socket");
......@@ -3524,6 +3507,19 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
if (bytes == 0)
break;
received += bytes;
verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
if (((NULL != strstr(response_buf, resEncodingChunk))
&& (NULL != strstr(response_buf, resHttp)))
|| ((NULL != strstr(response_buf, resHttpOk))
&& (NULL != strstr(response_buf, "\"status\":")))) {
debugPrint(
"%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
break;
}
} while(received < resp_len);
if (received == resp_len) {
......@@ -3532,20 +3528,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
}
response_buf[RESP_BUF_LEN - 1] = '\0';
printf("Response:\n%s\n", response_buf);
if (strlen(pThreadInfo->filePath) > 0) {
appendResultBufToFile(response_buf, pThreadInfo);
}
free(request_buf);
#ifdef WINDOWS
closesocket(sockfd);
WSACleanup();
#else
close(sockfd);
#endif
if (NULL == strstr(response_buf, resHttpOk)) {
errorPrint("%s() LN%d, Response:\n%s\n",
__func__, __LINE__, response_buf);
return -1;
}
return 0;
}
......@@ -4583,7 +4577,6 @@ static void* createTable(void *sarg)
return NULL;
}
pThreadInfo->tables_created += batchNum;
uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
......@@ -4597,8 +4590,8 @@ static void* createTable(void *sarg)
NO_INSERT_TYPE, false)) {
errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer);
}
pThreadInfo->tables_created += batchNum;
}
free(pThreadInfo->buffer);
return NULL;
}
......@@ -7009,7 +7002,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
if (0 != postProceSql(g_Dbs.host, g_Dbs.port,
pThreadInfo->buffer, pThreadInfo)) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n",
......@@ -10551,6 +10544,33 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_Dbs.serv_addr), sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
......@@ -10588,6 +10608,14 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tmfree((char *)pThreadInfo->bind_ts_array);
tmfree(pThreadInfo->bindParams);
tmfree(pThreadInfo->is_null);
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
#else
if (pThreadInfo->sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES; k++) {
......@@ -11250,6 +11278,31 @@ static int queryTestProcess() {
}
}
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pThreadInfo->taos = NULL;// workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
......@@ -11301,6 +11354,31 @@ static int queryTestProcess() {
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // workaround to use separate taos connection;
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
......@@ -11313,6 +11391,15 @@ static int queryTestProcess() {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infos + i * nSqlCount + j;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
}
}
}
......@@ -11322,6 +11409,15 @@ static int queryTestProcess() {
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infosOfSub + i;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
}
tmfree((char*)pidsOfSub);
......@@ -11997,7 +12093,6 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
printf("Regex match failed: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册