diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index edc589b7e6868f6b1835699886197eb6d275959e..fae13fd3a7e13e14828519aa157236d7a4178c43 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -75,6 +75,7 @@ extern char configDir[]; #define MAX_DATA_SIZE (16*TSDB_MAX_COLUMNS)+20 // max record len: 16*MAX_COLUMNS, timestamp string and ,('') need extra space #define OPT_ABORT 1 /* –abort */ #define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255. +#define MAX_PATH_LEN 4096 #define DEFAULT_START_TIME 1500000000000 @@ -244,6 +245,7 @@ typedef struct SArguments_S { uint64_t insert_interval; uint64_t timestamp_step; int64_t query_times; + int64_t prepared_rand; uint32_t interlaceRows; uint32_t reqPerReq; // num_of_records_per_req uint64_t max_sql_len; @@ -303,6 +305,7 @@ typedef struct SSuperTable_S { uint64_t lenOfTagOfOneRow; char* sampleDataBuf; + bool useSampleTs; uint32_t tagSource; // 0: rand, 1: tag sample char* tagDataBuf; @@ -363,7 +366,7 @@ typedef struct SDataBase_S { bool drop; // 0: use exists, 1: if exists, drop then new create SDbCfg dbCfg; uint64_t superTblCount; - SSuperTable superTbls[MAX_SUPER_TABLE_COUNT]; + SSuperTable* superTbls; } SDataBase; typedef struct SDbs_S { @@ -382,12 +385,11 @@ typedef struct SDbs_S { uint32_t threadCount; uint32_t threadCountForCreateTbl; uint32_t dbCount; - SDataBase db[MAX_DB_COUNT]; - // statistics uint64_t totalInsertRows; uint64_t totalAffectedRows; + SDataBase* db; } SDbs; typedef struct SpecifiedQueryInfo_S { @@ -501,6 +503,7 @@ typedef struct SThreadInfo_S { uint64_t querySeq; // sequence number of sql command TAOS_SUB* tsub; + int sockfd; } threadInfo; #ifdef WINDOWS @@ -580,8 +583,7 @@ static void prompt(); static int createDatabasesAndStables(); static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); -static int postProceSql(char *host, struct sockaddr_in *pServAddr, - uint16_t port, char* sqlstr, threadInfo *pThreadInfo); +static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo); static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, int disorderRange); static bool getInfoFromJsonFile(char* file); @@ -590,12 +592,12 @@ static int regexMatch(const char *s, const char *reg, int cflags); /* ************ Global variables ************ */ -int32_t g_randint[MAX_PREPARED_RAND]; -uint32_t g_randuint[MAX_PREPARED_RAND]; -int64_t g_randbigint[MAX_PREPARED_RAND]; -uint64_t g_randubigint[MAX_PREPARED_RAND]; -float g_randfloat[MAX_PREPARED_RAND]; -double g_randdouble[MAX_PREPARED_RAND]; +int32_t* g_randint; +uint32_t* g_randuint; +int64_t* g_randbigint; +uint64_t* g_randubigint; +float* g_randfloat; +double* g_randdouble; char *g_randbool_buff = NULL; char *g_randint_buff = NULL; @@ -662,6 +664,7 @@ SArguments g_args = { 0, // insert_interval DEFAULT_TIMESTAMP_STEP, // timestamp_step 1, // query_times + 10000, // prepared_rand DEFAULT_INTERLACE_ROWS, // interlaceRows; 30000, // reqPerReq (1024*1024), // max_sql_len @@ -796,6 +799,8 @@ static void printHelp() { "Set the replica parameters of the database, By default use 1, min: 1, max: 3."); printf("%s%s%s%s\n", indent, "-m, --table-prefix=TABLEPREFIX", "\t", "Table prefix name. By default use 'd'."); + printf("%s%s%s%s\n", indent, "-E, --escape-character", "\t", + "Use escape character for Both Stable and normmal table name"); printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t", "The select sql file."); printf("%s%s%s%s\n", indent, "-N, --normal-table", "\t\t", "Use normal table flag."); @@ -1687,10 +1692,10 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->data_type[index] = TSDB_DATA_TYPE_DOUBLE; } else if (0 == strcasecmp(token, "TINYINT")) { arguments->data_type[index] = TSDB_DATA_TYPE_TINYINT; - } else if (1 == regexMatch(token, "^BINARY(\\([1-9][0-9]*\\))?$", REG_ICASE | + } else if (1 == regexMatch(token, "^BINARY(\\([1-9][0-9]*\\))?$", REG_ICASE | REG_EXTENDED)) { arguments->data_type[index] = TSDB_DATA_TYPE_BINARY; - } else if (1 == regexMatch(token, "^NCHAR(\\([1-9][0-9]*\\))?$", REG_ICASE | + } else if (1 == regexMatch(token, "^NCHAR(\\([1-9][0-9]*\\))?$", REG_ICASE | REG_EXTENDED)) { arguments->data_type[index] = TSDB_DATA_TYPE_NCHAR; } else if (0 == strcasecmp(token, "BOOL")) { @@ -2097,7 +2102,7 @@ static void tmfclose(FILE *fp) { } } -static void tmfree(char *buf) { +static void tmfree(void *buf) { if (NULL != buf) { free(buf); buf = NULL; @@ -2205,7 +2210,7 @@ static void selectAndGetResult( } else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) { int retCode = postProceSql( - g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, + g_queryInfo.host, g_queryInfo.port, command, pThreadInfo); if (0 != retCode) { @@ -2221,157 +2226,157 @@ static void selectAndGetResult( static char *rand_bool_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randbool_buff + ((cursor % MAX_PREPARED_RAND) * BOOL_BUFF_LEN); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randbool_buff + ((cursor % g_args.prepared_rand) * BOOL_BUFF_LEN); } static int32_t rand_bool() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randint[cursor % MAX_PREPARED_RAND] % 2; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randint[cursor % g_args.prepared_rand] % 2; } static char *rand_tinyint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randtinyint_buff + - ((cursor % MAX_PREPARED_RAND) * TINYINT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * TINYINT_BUFF_LEN); } static int32_t rand_tinyint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randint[cursor % MAX_PREPARED_RAND] % 128; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randint[cursor % g_args.prepared_rand] % 128; } static char *rand_utinyint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randutinyint_buff + - ((cursor % MAX_PREPARED_RAND) * TINYINT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * TINYINT_BUFF_LEN); } static int32_t rand_utinyint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randuint[cursor % MAX_PREPARED_RAND] % 255; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randuint[cursor % g_args.prepared_rand] % 255; } static char *rand_smallint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randsmallint_buff + - ((cursor % MAX_PREPARED_RAND) * SMALLINT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * SMALLINT_BUFF_LEN); } static int32_t rand_smallint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randint[cursor % MAX_PREPARED_RAND] % 32768; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randint[cursor % g_args.prepared_rand] % 32768; } static char *rand_usmallint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randusmallint_buff + - ((cursor % MAX_PREPARED_RAND) * SMALLINT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * SMALLINT_BUFF_LEN); } static int32_t rand_usmallint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randuint[cursor % MAX_PREPARED_RAND] % 65535; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randuint[cursor % g_args.prepared_rand] % 65535; } static char *rand_int_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randint_buff + ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randint_buff + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN); } static int32_t rand_int() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randint[cursor % MAX_PREPARED_RAND]; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randint[cursor % g_args.prepared_rand]; } static char *rand_uint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randuint_buff + ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randuint_buff + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN); } static int32_t rand_uint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randuint[cursor % MAX_PREPARED_RAND]; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randuint[cursor % g_args.prepared_rand]; } static char *rand_bigint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randbigint_buff + - ((cursor % MAX_PREPARED_RAND) * BIGINT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * BIGINT_BUFF_LEN); } static int64_t rand_bigint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randbigint[cursor % MAX_PREPARED_RAND]; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randbigint[cursor % g_args.prepared_rand]; } static char *rand_ubigint_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randubigint_buff + - ((cursor % MAX_PREPARED_RAND) * BIGINT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * BIGINT_BUFF_LEN); } static int64_t rand_ubigint() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randubigint[cursor % MAX_PREPARED_RAND]; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randubigint[cursor % g_args.prepared_rand]; } static char *rand_float_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randfloat_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randfloat_buff + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN); } @@ -2379,58 +2384,58 @@ static float rand_float() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_randfloat[cursor % MAX_PREPARED_RAND]; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_randfloat[cursor % g_args.prepared_rand]; } static char *demo_current_float_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_rand_current_buff + - ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN); } static float UNUSED_FUNC demo_current_float() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return (float)(9.8 + 0.04 * (g_randint[cursor % MAX_PREPARED_RAND] % 10) - + g_randfloat[cursor % MAX_PREPARED_RAND]/1000000000); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return (float)(9.8 + 0.04 * (g_randint[cursor % g_args.prepared_rand] % 10) + + g_randfloat[cursor % g_args.prepared_rand]/1000000000); } static char *demo_voltage_int_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_rand_voltage_buff + - ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN); + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN); } static int32_t UNUSED_FUNC demo_voltage_int() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return 215 + g_randint[cursor % MAX_PREPARED_RAND] % 10; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return 215 + g_randint[cursor % g_args.prepared_rand] % 10; } static char *demo_phase_float_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return g_rand_phase_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return g_rand_phase_buff + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN); } static float UNUSED_FUNC demo_phase_float() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; - return (float)((115 + g_randint[cursor % MAX_PREPARED_RAND] % 10 - + g_randfloat[cursor % MAX_PREPARED_RAND]/1000000000)/360); + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; + return (float)((115 + g_randint[cursor % g_args.prepared_rand] % 10 + + g_randfloat[cursor % g_args.prepared_rand]/1000000000)/360); } #if 0 @@ -2469,7 +2474,7 @@ static char *rand_double_str() { static int cursor; cursor++; - if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; + if (cursor > (g_args.prepared_rand - 1)) cursor = 0; return g_randdouble_buff + (cursor * DOUBLE_BUFF_LEN); } @@ -2477,42 +2482,54 @@ static double rand_double() { static int cursor; cursor++; - cursor = cursor % MAX_PREPARED_RAND; + cursor = cursor % g_args.prepared_rand; return g_randdouble[cursor]; } static void init_rand_data() { - g_randint_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND); + g_randint_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand); assert(g_randint_buff); - g_rand_voltage_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND); + g_rand_voltage_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand); assert(g_rand_voltage_buff); - g_randbigint_buff = calloc(1, BIGINT_BUFF_LEN * MAX_PREPARED_RAND); + g_randbigint_buff = calloc(1, BIGINT_BUFF_LEN * g_args.prepared_rand); assert(g_randbigint_buff); - g_randsmallint_buff = calloc(1, SMALLINT_BUFF_LEN * MAX_PREPARED_RAND); + g_randsmallint_buff = calloc(1, SMALLINT_BUFF_LEN * g_args.prepared_rand); assert(g_randsmallint_buff); - g_randtinyint_buff = calloc(1, TINYINT_BUFF_LEN * MAX_PREPARED_RAND); + g_randtinyint_buff = calloc(1, TINYINT_BUFF_LEN * g_args.prepared_rand); assert(g_randtinyint_buff); - g_randbool_buff = calloc(1, BOOL_BUFF_LEN * MAX_PREPARED_RAND); + g_randbool_buff = calloc(1, BOOL_BUFF_LEN * g_args.prepared_rand); assert(g_randbool_buff); - g_randfloat_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND); + g_randfloat_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand); assert(g_randfloat_buff); - g_rand_current_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND); + g_rand_current_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand); assert(g_rand_current_buff); - g_rand_phase_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND); + g_rand_phase_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand); assert(g_rand_phase_buff); - g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * MAX_PREPARED_RAND); + g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * g_args.prepared_rand); assert(g_randdouble_buff); - g_randuint_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND); + g_randuint_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand); assert(g_randuint_buff); - g_randutinyint_buff = calloc(1, TINYINT_BUFF_LEN * MAX_PREPARED_RAND); + g_randutinyint_buff = calloc(1, TINYINT_BUFF_LEN * g_args.prepared_rand); assert(g_randutinyint_buff); - g_randusmallint_buff = calloc(1, SMALLINT_BUFF_LEN * MAX_PREPARED_RAND); + g_randusmallint_buff = calloc(1, SMALLINT_BUFF_LEN * g_args.prepared_rand); assert(g_randusmallint_buff); - g_randubigint_buff = calloc(1, BIGINT_BUFF_LEN * MAX_PREPARED_RAND); + g_randubigint_buff = calloc(1, BIGINT_BUFF_LEN * g_args.prepared_rand); assert(g_randubigint_buff); - - for (int i = 0; i < MAX_PREPARED_RAND; i++) { + g_randint = calloc(1, sizeof(int32_t) * g_args.prepared_rand); + assert(g_randint); + g_randuint = calloc(1, sizeof(uint32_t) * g_args.prepared_rand); + assert(g_randuint); + g_randbigint = calloc(1, sizeof(int64_t) * g_args.prepared_rand); + assert(g_randbigint); + g_randubigint = calloc(1, sizeof(uint64_t) * g_args.prepared_rand); + assert(g_randubigint); + g_randfloat = calloc(1, sizeof(float) * g_args.prepared_rand); + assert(g_randfloat); + g_randdouble = calloc(1, sizeof(double) * g_args.prepared_rand); + assert(g_randdouble); + + for (int i = 0; i < g_args.prepared_rand; i++) { g_randint[i] = (int)(taosRandom() % RAND_MAX - (RAND_MAX >> 1)); g_randuint[i] = (int)(taosRandom()); sprintf(g_randint_buff + i * INT_BUFF_LEN, "%d", @@ -2755,6 +2772,8 @@ static int printfInsertMeta() { g_Dbs.db[i].superTbls[j].sampleFormat); printf(" sampleFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFile); + printf(" useSampleTs: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].useSampleTs ? "yes (warning: disorderRange/disorderRatio is disabled)" : "no"); printf(" tagsFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].tagsFile); printf(" columnCount: \033[33m%d\033[0m\n ", @@ -2799,8 +2818,6 @@ static int printfInsertMeta() { printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_args.insertRows); } - - printf("\n"); } @@ -3384,7 +3401,7 @@ static void printfQuerySystemInfo(TAOS * taos) { free(dbInfos); } -static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, +static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo) { char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; @@ -3416,35 +3433,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'}; - snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", + if (g_args.test_mode == INSERT_TEST) { + snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", g_Dbs.user, g_Dbs.password); + } else { + snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", + g_queryInfo.user, g_queryInfo.password); + } + size_t userpass_buf_len = strlen(userpass_buf); size_t encoded_len = 4 * ((userpass_buf_len +2) / 3); char base64_buf[INPUT_BUF_LEN]; -#ifdef WINDOWS - WSADATA wsaData; - WSAStartup(MAKEWORD(2, 2), &wsaData); - SOCKET sockfd; -#else - int sockfd; -#endif - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) { -#ifdef WINDOWS - errorPrint( "Could not create socket : %d" , WSAGetLastError()); -#endif - debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); - free(request_buf); - ERROR_EXIT("opening socket"); - } - - int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr)); - debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); - if (retConn < 0) { - free(request_buf); - ERROR_EXIT("connecting"); - } memset(base64_buf, 0, INPUT_BUF_LEN); @@ -3484,9 +3484,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port sent = 0; do { #ifdef WINDOWS - bytes = send(sockfd, request_buf + sent, req_str_len - sent, 0); + bytes = send(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent, 0); #else - bytes = write(sockfd, request_buf + sent, req_str_len - sent); + bytes = write(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent); #endif if (bytes < 0) ERROR_EXIT("writing message to socket"); @@ -3505,9 +3505,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port do { #ifdef WINDOWS - bytes = recv(sockfd, response_buf + received, resp_len - received, 0); + bytes = recv(pThreadInfo->sockfds, response_buf + received, resp_len - received, 0); #else - bytes = read(sockfd, response_buf + received, resp_len - received); + bytes = read(pThreadInfo->sockfd, response_buf + received, resp_len - received); #endif verbosePrint("%s() LN%d: bytes:%d\n", __func__, __LINE__, bytes); if (bytes < 0) { @@ -3518,12 +3518,11 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port break; received += bytes; - response_buf[RESP_BUF_LEN - 1] = '\0'; + verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", + __func__, __LINE__, received, resp_len, response_buf); + response_buf[RESP_BUF_LEN - 1] = '\0'; if (strlen(response_buf)) { - verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", - __func__, __LINE__, received, resp_len, response_buf); - if (((NULL == strstr(response_buf, resEncodingChunk)) && (NULL != strstr(response_buf, resHttp))) || ((NULL != strstr(response_buf, resHttpOk)) @@ -3546,14 +3545,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port } free(request_buf); -#ifdef WINDOWS - closesocket(sockfd); - WSACleanup(); -#else - close(sockfd); -#endif - if (NULL == strstr(response_buf, "\"status\":\"succ\"")) { + response_buf[RESP_BUF_LEN - 1] = '\0'; + if (NULL == strstr(response_buf, resHttpOk)) { errorPrint("%s() LN%d, Response:\n%s\n", __func__, __LINE__, response_buf); return -1; @@ -3815,8 +3809,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, limit, offset); //get all child table name use cmd: select tbname from superTblName; - snprintf(command, 1024, "select tbname from %s.%s %s", - dbName, stbName, limitBuf); + snprintf(command, 1024, "select tbname from %s.%s %s", dbName, stbName, limitBuf); res = taos_query(taos, command); int32_t code = taos_errno(res); @@ -3997,21 +3990,21 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); - + if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], - "INT", strlen("INT")) && + "INT", strlen("INT")) && strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_INT; } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], - "TINYINT", strlen("TINYINT")) && + "TINYINT", strlen("TINYINT")) && strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_TINYINT; } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], - "SMALLINT", strlen("SMALLINT")) && + "SMALLINT", strlen("SMALLINT")) && strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_SMALLINT; } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], - "BIGINT", strlen("BIGINT")) && + "BIGINT", strlen("BIGINT")) && strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_BIGINT; } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], @@ -4065,7 +4058,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], min(NOTE_BUFF_LEN, fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes) + 1); - + if (strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { tstrncpy(superTbls->columns[columnIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], @@ -4344,9 +4337,10 @@ static int createSuperTable( superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow; + snprintf(command, BUFFER_SIZE, - "CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s", - dbName, superTbl->stbName, cols, tags); + "CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s", + dbName, superTbl->stbName, cols, tags); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) { errorPrint2("create supertable %s failed!\n\n", superTbl->stbName); @@ -4592,7 +4586,6 @@ static void* createTable(void *sarg) return NULL; } pThreadInfo->tables_created += batchNum; - uint64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", @@ -4606,8 +4599,8 @@ static void* createTable(void *sarg) NO_INSERT_TYPE, false)) { errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); } + pThreadInfo->tables_created += batchNum; } - free(pThreadInfo->buffer); return NULL; } @@ -4814,6 +4807,23 @@ static int readTagFromCsvFileToMem(SSuperTable * stbInfo) { return 0; } +static void getAndSetRowsFromCsvFile(SSuperTable *stbInfo) { + FILE *fp = fopen(stbInfo->sampleFile, "r"); + int line_count = 0; + if (fp == NULL) { + errorPrint("Failed to open sample file: %s, reason:%s\n", + stbInfo->sampleFile, strerror(errno)); + exit(EXIT_FAILURE); + } + char *buf = calloc(1, stbInfo->maxSqlLen); + while (fgets(buf, stbInfo->maxSqlLen, fp)) { + line_count++; + } + fclose(fp); + tmfree(buf); + stbInfo->insertRows = line_count; +} + /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ @@ -5267,6 +5277,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + cJSON* prepareRand = cJSON_GetObjectItem(root, "prepared_rand"); + if (prepareRand && prepareRand->type == cJSON_Number) { + if (prepareRand->valueint <= 0) { + errorPrint("%s() LN%d, failed to read json, prepared_rand input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; + } + g_args.prepared_rand = prepareRand->valueint; + } else if (!prepareRand) { + g_args.prepared_rand = 10000; + } else { + errorPrint("%s() LN%d, failed to read json, prepared_rand not found\n", + __func__, __LINE__); + goto PARSE_OVER; + } + cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, if (answerPrompt && answerPrompt->type == cJSON_String @@ -5308,7 +5334,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { MAX_DB_COUNT); goto PARSE_OVER; } - + g_Dbs.db = calloc(1, sizeof(SDataBase)*dbSize); + assert(g_Dbs.db); g_Dbs.dbCount = dbSize; for (int i = 0; i < dbSize; ++i) { cJSON* dbinfos = cJSON_GetArrayItem(dbs, i); @@ -5508,7 +5535,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { MAX_SUPER_TABLE_COUNT); goto PARSE_OVER; } - + g_Dbs.db[i].superTbls = calloc(1, stbSize * sizeof(SSuperTable)); + assert(g_Dbs.db[i].superTbls); g_Dbs.db[i].superTblCount = stbSize; for (int j = 0; j < stbSize; ++j) { cJSON* stbInfo = cJSON_GetArrayItem(stables, j); @@ -5707,6 +5735,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + cJSON *useSampleTs = cJSON_GetObjectItem(stbInfo, "use_sample_ts"); + if (useSampleTs && useSampleTs->type == cJSON_String + && useSampleTs->valuestring != NULL) { + if (0 == strncasecmp(useSampleTs->valuestring, "yes", 3)) { + g_Dbs.db[i].superTbls[j].useSampleTs = true; + } else if (0 == strncasecmp(useSampleTs->valuestring, "no", 2)){ + g_Dbs.db[i].superTbls[j].useSampleTs = false; + } else { + g_Dbs.db[i].superTbls[j].useSampleTs = false; + } + } else if (!useSampleTs) { + g_Dbs.db[i].superTbls[j].useSampleTs = false; + } else { + errorPrint("%s", "failed to read json, use_sample_ts not found\n"); + goto PARSE_OVER; + } + cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); if ((tagsFile && tagsFile->type == cJSON_String) && (tagsFile->valuestring != NULL)) { @@ -6364,9 +6409,12 @@ static bool getInfoFromJsonFile(char* file) { } if (INSERT_TEST == g_args.test_mode) { + memset(&g_Dbs, 0, sizeof(SDbs)); + g_Dbs.use_metric = g_args.use_metric; ret = getMetaFromInsertJsonFile(root); } else if ((QUERY_TEST == g_args.test_mode) || (SUBSCRIBE_TEST == g_args.test_mode)) { + memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo)); ret = getMetaFromQueryJsonFile(root); } else { errorPrint("%s", @@ -6431,8 +6479,9 @@ static void postFreeResource() { g_Dbs.db[i].superTbls[j].childTblName = NULL; } } + tmfree(g_Dbs.db[i].superTbls); } - + tmfree(g_Dbs.db); tmfree(g_randbool_buff); tmfree(g_randint_buff); tmfree(g_rand_voltage_buff); @@ -6455,6 +6504,7 @@ static void postFreeResource() { } } tmfree(g_sampleBindBatchArray); + #endif } @@ -6467,13 +6517,20 @@ static int getRowDataFromSample( } int dataLen = 0; - - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + if(stbInfo->useSampleTs) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "(%s", + stbInfo->sampleDataBuf + + stbInfo->lenOfOneRow * (*sampleUsePos)); + } else { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%s", stbInfo->sampleDataBuf + stbInfo->lenOfOneRow * (*sampleUsePos)); + } + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); (*sampleUsePos)++; @@ -6545,7 +6602,7 @@ static int64_t generateStbRowData( tmpLen = strlen(tmp); tstrncpy(pstr + dataLen, tmp, min(tmpLen + 1, BIGINT_BUFF_LEN)); break; - + case TSDB_DATA_TYPE_UBIGINT: tmp = rand_ubigint_str(); tmpLen = strlen(tmp); @@ -6906,6 +6963,9 @@ static int prepareSampleForStb(SSuperTable *stbInfo) { int ret; if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample"))) { + if(stbInfo->useSampleTs) { + getAndSetRowsFromCsvFile(stbInfo); + } ret = generateSampleFromCsvForStb(stbInfo); } else { ret = generateSampleFromRandForStb(stbInfo); @@ -6956,7 +7016,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); - if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, + if (0 != postProceSql(g_Dbs.host, g_Dbs.port, pThreadInfo->buffer, pThreadInfo)) { affectedRows = -1; printf("========restful return fail, threadID[%d]\n", @@ -7007,12 +7067,11 @@ static void getTableName(char *pTblName, stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); } } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", - stbInfo->childTblPrefix, tableSeq); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, + "%s%"PRIu64"", stbInfo->childTblPrefix, tableSeq); } } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", - g_args.tb_prefix, tableSeq); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", g_args.tb_prefix, tableSeq); } } @@ -7432,7 +7491,7 @@ static int32_t prepareStmtBindArrayByType( bind->length = &bind->buffer_length; bind->is_null = NULL; break; - + case TSDB_DATA_TYPE_UINT: bind_uint = malloc(sizeof(uint32_t)); assert(bind_uint); @@ -10495,6 +10554,33 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); } */ + + if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { +#ifdef WINDOWS + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + SOCKET sockfd; +#else + int sockfd; +#endif + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { +#ifdef WINDOWS + errorPrint( "Could not create socket : %d" , WSAGetLastError()); +#endif + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); + ERROR_EXIT("opening socket"); + } + + int retConn = connect(sockfd, (struct sockaddr *)&(g_Dbs.serv_addr), sizeof(struct sockaddr)); + debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); + if (retConn < 0) { + ERROR_EXIT("connecting"); + } + pThreadInfo->sockfd = sockfd; + } + + tsem_init(&(pThreadInfo->lock_sem), 0, 0); if (ASYNC_MODE == g_Dbs.asyncMode) { pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); @@ -10532,6 +10618,14 @@ static void startMultiThreadInsertData(int threads, char* db_name, tmfree((char *)pThreadInfo->bind_ts_array); tmfree(pThreadInfo->bindParams); tmfree(pThreadInfo->is_null); + if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } #else if (pThreadInfo->sampleBindArray) { for (int k = 0; k < MAX_SAMPLES; k++) { @@ -11194,6 +11288,31 @@ static int queryTestProcess() { } } + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { +#ifdef WINDOWS + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + SOCKET sockfd; +#else + int sockfd; +#endif + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { +#ifdef WINDOWS + errorPrint( "Could not create socket : %d" , WSAGetLastError()); +#endif + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); + ERROR_EXIT("opening socket"); + } + + int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr), + sizeof(struct sockaddr)); + debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); + if (retConn < 0) { + ERROR_EXIT("connecting"); + } + pThreadInfo->sockfd = sockfd; + } pThreadInfo->taos = NULL;// workaround to use separate taos connection; pthread_create(pids + seq, NULL, specifiedTableQuery, @@ -11245,6 +11364,31 @@ static int queryTestProcess() { pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->taos = NULL; // workaround to use separate taos connection; + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { +#ifdef WINDOWS + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + SOCKET sockfd; +#else + int sockfd; +#endif + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { +#ifdef WINDOWS + errorPrint( "Could not create socket : %d" , WSAGetLastError()); +#endif + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); + ERROR_EXIT("opening socket"); + } + + int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr), + sizeof(struct sockaddr)); + debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); + if (retConn < 0) { + ERROR_EXIT("connecting"); + } + pThreadInfo->sockfd = sockfd; + } pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); } @@ -11257,6 +11401,15 @@ static int queryTestProcess() { for (int i = 0; i < nConcurrent; i++) { for (int j = 0; j < nSqlCount; j++) { pthread_join(pids[i * nSqlCount + j], NULL); + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { + threadInfo *pThreadInfo = infos + i * nSqlCount + j; +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } } } } @@ -11266,6 +11419,15 @@ static int queryTestProcess() { for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); + if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { + threadInfo *pThreadInfo = infosOfSub + i; +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } } tmfree((char*)pidsOfSub); @@ -11768,29 +11930,6 @@ static int subscribeTestProcess() { return 0; } -static void initOfInsertMeta() { - memset(&g_Dbs, 0, sizeof(SDbs)); - - // set default values - tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE); - g_Dbs.port = 6030; - tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE); - tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, SHELL_MAX_PASSWORD_LEN); - g_Dbs.threadCount = 2; - - g_Dbs.use_metric = g_args.use_metric; -} - -static void initOfQueryMeta() { - memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo)); - - // set default values - tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE); - g_queryInfo.port = 6030; - tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE); - tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, SHELL_MAX_PASSWORD_LEN); -} - static void setParaFromArg() { char type[20]; char length[20]; @@ -11823,7 +11962,7 @@ static void setParaFromArg() { tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN); g_Dbs.use_metric = g_args.use_metric; - + g_args.prepared_rand = min(g_args.insertRows, MAX_PREPARED_RAND); g_Dbs.aggr_func = g_args.aggr_func; char dataString[TSDB_MAX_BYTES_PER_ROW]; @@ -11940,7 +12079,6 @@ static int regexMatch(const char *s, const char *reg, int cflags) { printf("Regex match failed: %s\n", msgbuf); exit(EXIT_FAILURE); } - return 0; } @@ -12100,8 +12238,6 @@ int main(int argc, char *argv[]) { if (g_args.metaFile) { g_totalChildTables = 0; - initOfInsertMeta(); - initOfQueryMeta(); if (false == getInfoFromJsonFile(g_args.metaFile)) { printf("Failed to read %s\n", g_args.metaFile); @@ -12111,6 +12247,10 @@ int main(int argc, char *argv[]) { testMetaFile(); } else { memset(&g_Dbs, 0, sizeof(SDbs)); + g_Dbs.db = calloc(1, sizeof(SDataBase)); + assert(g_Dbs.db); + g_Dbs.db[0].superTbls = calloc(1, sizeof(SSuperTable)); + assert(g_Dbs.db[0].superTbls); setParaFromArg(); if (NULL != g_args.sqlFile) {