diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index ad8ab9361ed4aab1899259f370de022e45ecd368..2f5161345fa2d7659b52537ef60d54dd76f54bd5 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -504,6 +504,7 @@ typedef struct SThreadInfo_S { uint64_t querySeq; // sequence number of sql command TAOS_SUB* tsub; + int sockfd; } threadInfo; #ifdef WINDOWS @@ -583,8 +584,7 @@ static void prompt(); static int createDatabasesAndStables(); static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); -static int postProceSql(char *host, struct sockaddr_in *pServAddr, - uint16_t port, char* sqlstr, threadInfo *pThreadInfo); +static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo); static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, int disorderRange); static bool getInfoFromJsonFile(char* file); @@ -2218,7 +2218,7 @@ static void selectAndGetResult( } else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) { int retCode = postProceSql( - g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, + g_queryInfo.host, g_queryInfo.port, command, pThreadInfo); if (0 != retCode) { @@ -3397,7 +3397,7 @@ static void printfQuerySystemInfo(TAOS * taos) { free(dbInfos); } -static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, +static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo) { char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; @@ -3435,29 +3435,6 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port size_t encoded_len = 4 * ((userpass_buf_len +2) / 3); char base64_buf[INPUT_BUF_LEN]; -#ifdef WINDOWS - WSADATA wsaData; - WSAStartup(MAKEWORD(2, 2), &wsaData); - SOCKET sockfd; -#else - int sockfd; -#endif - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) { -#ifdef WINDOWS - errorPrint( "Could not create socket : %d" , WSAGetLastError()); -#endif - debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); - free(request_buf); - ERROR_EXIT("opening socket"); - } - - int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr)); - debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); - if (retConn < 0) { - free(request_buf); - ERROR_EXIT("connecting"); - } memset(base64_buf, 0, INPUT_BUF_LEN); @@ -3497,9 +3474,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port sent = 0; do { #ifdef WINDOWS - bytes = send(sockfd, request_buf + sent, req_str_len - sent, 0); + bytes = send(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent, 0); #else - bytes = write(sockfd, request_buf + sent, req_str_len - sent); + bytes = write(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent); #endif if (bytes < 0) ERROR_EXIT("writing message to socket"); @@ -3511,12 +3488,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port memset(response_buf, 0, RESP_BUF_LEN); resp_len = sizeof(response_buf) - 1; received = 0; + + char resEncodingChunk[] = "Encoding: chunked"; + char resHttp[] = "HTTP/1.1 "; + char resHttpOk[] = "HTTP/1.1 200 OK"; + do { #ifdef WINDOWS - bytes = recv(sockfd, response_buf + received, resp_len - received, 0); + bytes = recv(pThreadInfo->sockfds, response_buf + received, resp_len - received, 0); #else - bytes = read(sockfd, response_buf + received, resp_len - received); + bytes = read(pThreadInfo->sockfd, response_buf + received, resp_len - received); #endif + verbosePrint("%s() LN%d: bytes:%d\n", __func__, __LINE__, bytes); if (bytes < 0) { free(request_buf); ERROR_EXIT("reading response from socket"); @@ -3524,6 +3507,19 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port if (bytes == 0) break; received += bytes; + + verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", + __func__, __LINE__, received, resp_len, response_buf); + + if (((NULL != strstr(response_buf, resEncodingChunk)) + && (NULL != strstr(response_buf, resHttp))) + || ((NULL != strstr(response_buf, resHttpOk)) + && (NULL != strstr(response_buf, "\"status\":")))) { + debugPrint( + "%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", + __func__, __LINE__, received, resp_len, response_buf); + break; + } } while(received < resp_len); if (received == resp_len) { @@ -3532,20 +3528,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port } response_buf[RESP_BUF_LEN - 1] = '\0'; - printf("Response:\n%s\n", response_buf); if (strlen(pThreadInfo->filePath) > 0) { appendResultBufToFile(response_buf, pThreadInfo); } free(request_buf); -#ifdef WINDOWS - closesocket(sockfd); - WSACleanup(); -#else - close(sockfd); -#endif + if (NULL == strstr(response_buf, resHttpOk)) { + errorPrint("%s() LN%d, Response:\n%s\n", + __func__, __LINE__, response_buf); + return -1; + } return 0; } @@ -4583,7 +4577,6 @@ static void* createTable(void *sarg) return NULL; } pThreadInfo->tables_created += batchNum; - uint64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", @@ -4597,8 +4590,8 @@ static void* createTable(void *sarg) NO_INSERT_TYPE, false)) { errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); } + pThreadInfo->tables_created += batchNum; } - free(pThreadInfo->buffer); return NULL; } @@ -7009,7 +7002,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); - if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, + if (0 != postProceSql(g_Dbs.host, g_Dbs.port, pThreadInfo->buffer, pThreadInfo)) { affectedRows = -1; printf("========restful return fail, threadID[%d]\n", @@ -10551,6 +10544,33 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); } */ + + if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { +#ifdef WINDOWS + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + SOCKET sockfd; +#else + int sockfd; +#endif + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { +#ifdef WINDOWS + errorPrint( "Could not create socket : %d" , WSAGetLastError()); +#endif + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); + ERROR_EXIT("opening socket"); + } + + int retConn = connect(sockfd, (struct sockaddr *)&(g_Dbs.serv_addr), sizeof(struct sockaddr)); + debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); + if (retConn < 0) { + ERROR_EXIT("connecting"); + } + pThreadInfo->sockfd = sockfd; + } + + tsem_init(&(pThreadInfo->lock_sem), 0, 0); if (ASYNC_MODE == g_Dbs.asyncMode) { pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); @@ -10588,6 +10608,14 @@ static void startMultiThreadInsertData(int threads, char* db_name, tmfree((char *)pThreadInfo->bind_ts_array); tmfree(pThreadInfo->bindParams); tmfree(pThreadInfo->is_null); + if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } #else if (pThreadInfo->sampleBindArray) { for (int k = 0; k < MAX_SAMPLES; k++) { @@ -11250,6 +11278,31 @@ static int queryTestProcess() { } } + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { +#ifdef WINDOWS + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + SOCKET sockfd; +#else + int sockfd; +#endif + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { +#ifdef WINDOWS + errorPrint( "Could not create socket : %d" , WSAGetLastError()); +#endif + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); + ERROR_EXIT("opening socket"); + } + + int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr), + sizeof(struct sockaddr)); + debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); + if (retConn < 0) { + ERROR_EXIT("connecting"); + } + pThreadInfo->sockfd = sockfd; + } pThreadInfo->taos = NULL;// workaround to use separate taos connection; pthread_create(pids + seq, NULL, specifiedTableQuery, @@ -11301,6 +11354,31 @@ static int queryTestProcess() { pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->taos = NULL; // workaround to use separate taos connection; + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { +#ifdef WINDOWS + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + SOCKET sockfd; +#else + int sockfd; +#endif + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { +#ifdef WINDOWS + errorPrint( "Could not create socket : %d" , WSAGetLastError()); +#endif + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); + ERROR_EXIT("opening socket"); + } + + int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr), + sizeof(struct sockaddr)); + debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); + if (retConn < 0) { + ERROR_EXIT("connecting"); + } + pThreadInfo->sockfd = sockfd; + } pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); } @@ -11313,6 +11391,15 @@ static int queryTestProcess() { for (int i = 0; i < nConcurrent; i++) { for (int j = 0; j < nSqlCount; j++) { pthread_join(pids[i * nSqlCount + j], NULL); + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { + threadInfo *pThreadInfo = infos + i * nSqlCount + j; +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } } } } @@ -11322,6 +11409,15 @@ static int queryTestProcess() { for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { + threadInfo *pThreadInfo = infosOfSub + i; +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } } tmfree((char*)pidsOfSub); @@ -11997,7 +12093,6 @@ static int regexMatch(const char *s, const char *reg, int cflags) { printf("Regex match failed: %s\n", msgbuf); exit(EXIT_FAILURE); } - return 0; }