From 7610e8111d637d8a77babc050ccb4680f0918d89 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 18 Mar 2021 17:43:09 +0800 Subject: [PATCH] Feature/sangshuduo/td 3317 taosdemo interlace (#5485) * [TD-3316] : add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] : add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] : add test case for limit and offset. fix sample data issue. * [TD-3327] : fix taosdemo segfault when import data from sample data file. * [TD-3317] : make taosdemo support interlace mode. json parameter rows_per_tbl support. * [TD-3317] : support interlace mode. refactor * [TD-3317] : support interlace mode. refactor * [TD-3317] : support interlace mode insertion. refactor. * [TD-3317] : support interlace mode insertion. change json file. * [TD-3317] : support interlace mode insertion. fix multithread create table regression. * [TD-3317] : support interlace mode insertion. working but not perfect. * [TD-3317] : support interlace mode insertion. rename lowaTest with taosdemoTestWithJson * [TD-3317] : support interlace mode insertion. perfect * [TD-3317] : support interlace mode insertion. cleanup. * [TD-3317] : support interlace mode insertion. adjust algorithm of loop times. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 378 ++++++++++++++++++++---------------- 1 file changed, 211 insertions(+), 167 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 83195ca15d..355f738885 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -567,12 +567,19 @@ static FILE * g_fpOfInsertResult = NULL; #define debugPrint(fmt, ...) \ do { if (g_args.debug_print || g_args.verbose_print) \ fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0) + #define verbosePrint(fmt, ...) \ do { if (g_args.verbose_print) \ fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) +#define errorPrint(fmt, ...) \ + do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0) + + /////////////////////////////////////////////////// +static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } + void printHelp() { char indent[10] = " "; printf("%s%s%s%s\n", indent, "-f", indent, @@ -645,7 +652,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { } else if (strcmp(argv[i], "-c") == 0) { char *configPath = argv[++i]; if (wordexp(configPath, &full_path, 0) != 0) { - fprintf(stderr, "Invalid path %s\n", configPath); + errorPrint( "Invalid path %s\n", configPath); return; } taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); @@ -694,8 +701,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(argv[i], "DOUBLE") && strcasecmp(argv[i], "BINARY") && strcasecmp(argv[i], "NCHAR")) { - fprintf(stderr, "Invalid data_type!\n"); printHelp(); + ERROR_EXIT( "Invalid data_type!\n"); exit(EXIT_FAILURE); } sptr[0] = argv[i]; @@ -715,8 +722,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(token, "DOUBLE") && strcasecmp(token, "BINARY") && strcasecmp(token, "NCHAR")) { - fprintf(stderr, "Invalid data_type!\n"); printHelp(); + ERROR_EXIT("Invalid data_type!\n"); exit(EXIT_FAILURE); } sptr[index++] = token; @@ -771,8 +778,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printHelp(); exit(0); } else { - fprintf(stderr, "wrong options\n"); printHelp(); + ERROR_EXIT("ERROR: wrong options\n"); exit(EXIT_FAILURE); } } @@ -858,7 +865,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { if (code != 0) { debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); - fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); + errorPrint( "Failed to run %s, reason: %s\n", command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); return -1; @@ -884,13 +891,13 @@ static void getResult(TAOS_RES *res, char* resultFileName) { if (resultFileName[0] != 0) { fp = fopen(resultFileName, "at"); if (fp == NULL) { - fprintf(stderr, "failed to open result file: %s, result will not save to file\n", resultFileName); + errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", __func__, __LINE__, resultFileName); } } char* databuf = (char*) calloc(1, 100*1024*1024); if (databuf == NULL) { - fprintf(stderr, "failed to malloc, warning: save result to file slowly!\n"); + errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", __func__, __LINE__); if (fp) fclose(fp); return ; @@ -1484,7 +1491,7 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) { FILE* fp = fopen(fname, "at"); if (fp == NULL) { - fprintf(stderr, "ERROR: failed to open file: %s\n", fname); + errorPrint("%s() LN%d, failed to open file: %s\n", __func__, __LINE__, fname); return -1; } @@ -1529,7 +1536,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { int32_t code = taos_errno(res); if (code != 0) { - fprintf(stderr, "failed to run , reason: %s\n", taos_errstr(res)); + errorPrint( "failed to run , reason: %s\n", taos_errstr(res)); return -1; } @@ -1541,7 +1548,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo)); if (dbInfos[count] == NULL) { - fprintf(stderr, "failed to allocate memory for some dbInfo[%d]\n", count); + errorPrint( "failed to allocate memory for some dbInfo[%d]\n", count); return -1; } @@ -1576,7 +1583,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { count++; if (count > MAX_DATABASE_COUNT) { - fprintf(stderr, "The database count overflow than %d\n", MAX_DATABASE_COUNT); + errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT); break; } } @@ -1590,7 +1597,7 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind FILE *fp = fopen(filename, "at"); if (fp == NULL) { - fprintf(stderr, "failed to open file: %s\n", filename); + errorPrint( "failed to open file: %s\n", filename); return; } @@ -1646,7 +1653,7 @@ static void printfQuerySystemInfo(TAOS * taos) { res = taos_query(taos, "show databases;"); SDbInfo** dbInfos = (SDbInfo **)calloc(MAX_DATABASE_COUNT, sizeof(SDbInfo *)); if (dbInfos == NULL) { - fprintf(stderr, "failed to allocate memory\n"); + errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__); return; } int dbCount = getDbFromServer(taos, dbInfos); @@ -1676,8 +1683,6 @@ static void printfQuerySystemInfo(TAOS * taos) { } -static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } - static int postProceSql(char* host, uint16_t port, char* sqlstr) { char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; @@ -1725,9 +1730,9 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr) sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { #ifdef WINDOWS - fprintf(stderr, "Could not create socket : %d" , WSAGetLastError()); + errorPrint( "Could not create socket : %d" , WSAGetLastError()); #endif - debugPrint("%s() LN%d sockfd=%d\n", __func__, __LINE__, sockfd); + debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); free(request_buf); ERROR_EXIT("ERROR opening socket"); } @@ -1847,7 +1852,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr) static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { - printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); + errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1); return NULL; } @@ -2140,7 +2145,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, int childTblCount = 10000; superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); if (superTbls->childTblName == NULL) { - fprintf(stderr, "alloc memory failed!"); + errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); return -1; } getAllChildNameOfSuperTable(taos, dbName, @@ -2279,7 +2284,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { - fprintf(stderr, "create supertable %s failed!\n\n", + errorPrint( "create supertable %s failed!\n\n", superTbls->sTblName); return -1; } @@ -2293,7 +2298,7 @@ static int createDatabases() { int ret = 0; taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port); if (taos == NULL) { - fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); return -1; } char command[BUFFER_SIZE] = "\0"; @@ -2378,7 +2383,7 @@ static int createDatabases() { debugPrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); - fprintf(stderr, "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); + errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); return -1; } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); @@ -2427,7 +2432,7 @@ static void* createTable(void *sarg) char *buffer = calloc(buff_len, 1); if (buffer == NULL) { - fprintf(stderr, "Memory allocated failed!"); + errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__); exit(-1); } @@ -2485,7 +2490,7 @@ static void* createTable(void *sarg) len = 0; verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ - fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer); + errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); free(buffer); return NULL; } @@ -2501,7 +2506,7 @@ static void* createTable(void *sarg) if (0 != len) { verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)) { - fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer); + errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); } } @@ -2546,7 +2551,7 @@ static int startMultiThreadCreateChildTable( db_name, g_Dbs.port); if (t_info->taos == NULL) { - fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); free(pids); free(infos); return -1; @@ -2724,7 +2729,7 @@ static int readSampleFromCsvFileToMem( FILE* fp = fopen(superTblInfo->sampleFile, "r"); if (fp == NULL) { - fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", + errorPrint( "Failed to open sample file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); return -1; } @@ -2736,7 +2741,7 @@ static int readSampleFromCsvFileToMem( readLen = tgetline(&line, &n, fp); if (-1 == readLen) { if(0 != fseek(fp, 0, SEEK_SET)) { - fprintf(stderr, "Failed to fseek file: %s, reason:%s\n", + errorPrint( "Failed to fseek file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); fclose(fp); return -1; @@ -2805,8 +2810,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s int columnSize = cJSON_GetArraySize(columns); if (columnSize > MAX_COLUMN_COUNT) { - printf("ERROR: failed to read json, column size overflow, max column size is %d\n", - MAX_COLUMN_COUNT); + errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n", + __func__, __LINE__, MAX_COLUMN_COUNT); goto PARSE_OVER; } @@ -2824,7 +2829,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s if (countObj && countObj->type == cJSON_Number) { count = countObj->valueint; } else if (countObj && countObj->type != cJSON_Number) { - printf("ERROR: failed to read json, column count not found\n"); + errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__); goto PARSE_OVER; } else { count = 1; @@ -2834,7 +2839,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s memset(&columnCase, 0, sizeof(StrColumn)); cJSON *dataType = cJSON_GetObjectItem(column, "type"); if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { - printf("ERROR: failed to read json, column type not found\n"); + errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__); goto PARSE_OVER; } //tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE); @@ -2844,7 +2849,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s if (dataLen && dataLen->type == cJSON_Number) { columnCase.dataLen = dataLen->valueint; } else if (dataLen && dataLen->type != cJSON_Number) { - printf("ERROR: failed to read json, column len not found\n"); + debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__); goto PARSE_OVER; } else { columnCase.dataLen = 8; @@ -2863,13 +2868,13 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s // tags cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); if (!tags || tags->type != cJSON_Array) { - printf("ERROR: failed to read json, tags not found\n"); + debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__); goto PARSE_OVER; } int tagSize = cJSON_GetArraySize(tags); if (tagSize > MAX_TAG_COUNT) { - printf("ERROR: failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT); + debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT); goto PARSE_OVER; } @@ -2991,47 +2996,47 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval"); - if (gInsertInterval && gInsertInterval->type == cJSON_Number) { - g_args.insert_interval = gInsertInterval->valueint; - } else if (!gInsertInterval) { - g_args.insert_interval = 0; - } else { - fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n"); - goto PARSE_OVER; - } - - cJSON* rowsPerTbl = cJSON_GetObjectItem(root, "rows_per_tbl"); - if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { - g_args.rows_per_tbl = rowsPerTbl->valueint; - } else if (!rowsPerTbl) { - g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req - } else { - fprintf(stderr, "ERROR: failed to read json, rows_per_tbl input mistake\n"); - goto PARSE_OVER; - } + cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval"); + if (gInsertInterval && gInsertInterval->type == cJSON_Number) { + g_args.insert_interval = gInsertInterval->valueint; + } else if (!gInsertInterval) { + g_args.insert_interval = 0; + } else { + errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__); + goto PARSE_OVER; + } - cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); - if (maxSqlLen && maxSqlLen->type == cJSON_Number) { - g_args.max_sql_len = maxSqlLen->valueint; - } else if (!maxSqlLen) { - g_args.max_sql_len = TSDB_PAYLOAD_SIZE; - } else { - fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n"); - goto PARSE_OVER; - } - + cJSON* rowsPerTbl = cJSON_GetObjectItem(root, "rows_per_tbl"); + if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { + g_args.rows_per_tbl = rowsPerTbl->valueint; + } else if (!rowsPerTbl) { + g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req + } else { + errorPrint("%s() LN%d, failed to read json, rows_per_tbl input mistake\n", __func__, __LINE__); + goto PARSE_OVER; + } - cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); - if (numRecPerReq && numRecPerReq->type == cJSON_Number) { - g_args.num_of_RPR = numRecPerReq->valueint; - } else if (!numRecPerReq) { - g_args.num_of_RPR = 100; - } else { - printf("ERROR: failed to read json, num_of_records_per_req not found\n"); - goto PARSE_OVER; - } + cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); + if (maxSqlLen && maxSqlLen->type == cJSON_Number) { + g_args.max_sql_len = maxSqlLen->valueint; + } else if (!maxSqlLen) { + g_args.max_sql_len = TSDB_PAYLOAD_SIZE; + } else { + errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__); + goto PARSE_OVER; + } + + cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); + if (numRecPerReq && numRecPerReq->type == cJSON_Number) { + g_args.num_of_RPR = numRecPerReq->valueint; + } else if (!numRecPerReq) { + g_args.num_of_RPR = 100; + } else { + errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__); + goto PARSE_OVER; + } + cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, if (answerPrompt && answerPrompt->type == cJSON_String @@ -3058,7 +3063,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { int dbSize = cJSON_GetArraySize(dbs); if (dbSize > MAX_DB_COUNT) { - fprintf(stderr, + errorPrint( "ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT); goto PARSE_OVER; @@ -3257,7 +3262,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { int stbSize = cJSON_GetArraySize(stables); if (stbSize > MAX_SUPER_TABLE_COUNT) { - fprintf(stderr, + errorPrint( "ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT); goto PARSE_OVER; @@ -3384,9 +3389,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp"); if (ts && ts->type == cJSON_String && ts->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, ts->valuestring, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, + ts->valuestring, MAX_DB_NAME_SIZE); } else if (!ts) { - tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, "now", MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, + "now", MAX_DB_NAME_SIZE); } else { printf("ERROR: failed to read json, start_timestamp not found\n"); goto PARSE_OVER; @@ -3493,7 +3500,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!rowsPerTbl) { g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { - fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n"); + errorPrint("%s() LN%d, failed to read json, rowsPerTbl input mistake\n", __func__, __LINE__); goto PARSE_OVER; } @@ -3523,7 +3530,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!insertRows) { g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; } else { - fprintf(stderr, "failed to read json, insert_rows input mistake"); + errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n", __func__, __LINE__); goto PARSE_OVER; } @@ -3535,7 +3542,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { __func__, __LINE__, g_args.insert_interval); g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval; } else { - fprintf(stderr, "failed to read json, insert_interval input mistake"); + errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__); goto PARSE_OVER; } @@ -3942,7 +3949,7 @@ static bool getInfoFromJsonFile(char* file) { } else if (SUBSCRIBE_TEST == g_args.test_mode) { ret = getMetaFromQueryJsonFile(root); } else { - printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n"); + errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", __func__, __LINE__); goto PARSE_OVER; } @@ -4024,14 +4031,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { - printf("binary or nchar length overflow, max size:%u\n", + errorPrint( "binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); return (-1); } char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); if (NULL == buf) { - printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen); + errorPrint( "calloc failed! size:%d\n", stbInfo->columns[i].dataLen); return (-1); } rand_string(buf, stbInfo->columns[i].dataLen); @@ -4063,7 +4070,7 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); } else { - printf("No support data type: %s\n", stbInfo->columns[i].dataType); + errorPrint( "No support data type: %s\n", stbInfo->columns[i].dataType); return (-1); } } @@ -4138,7 +4145,8 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { sampleDataBuf = calloc( superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); if (sampleDataBuf == NULL) { - fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", + errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n", + __func__, __LINE__, superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); return -1; @@ -4148,7 +4156,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { int ret = readSampleFromCsvFileToMem(superTblInfo); if (0 != ret) { - fprintf(stderr, "read sample from csv file failed.\n"); + errorPrint("%s() LN%d, read sample from csv file failed.\n", __func__, __LINE__); tmfree(sampleDataBuf); superTblInfo->sampleDataBuf = NULL; return -1; @@ -4157,29 +4165,26 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { return 0; } -static int execInsert(threadInfo *winfo, char *buffer, int k) +static int execInsert(threadInfo *pThreadInfo, char *buffer, int k) { int affectedRows; - SSuperTable* superTblInfo = winfo->superTblInfo; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, buffer); if (superTblInfo) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); + affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE); } else { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - - if (0 != retCode) { + if (0 != postProceSql(g_Dbs.host, g_Dbs.port, buffer)) { affectedRows = -1; - printf("========restful return fail, threadID[%d]\n", winfo->threadID); + printf("========restful return fail, threadID[%d]\n", pThreadInfo->threadID); } else { affectedRows = k; } } } else { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - affectedRows = queryDbExec(winfo->taos, buffer, 1); + affectedRows = queryDbExec(pThreadInfo->taos, buffer, 1); } return affectedRows; @@ -4195,8 +4200,9 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq) superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); } else { - verbosePrint("%s() LN%d: from=%d count=%d seq=%d\n", - __func__, __LINE__, pThreadInfo->start_table_from, + verbosePrint("[%d] %s() LN%d: from=%d count=%d seq=%d\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, pThreadInfo->ntables, tableSeq); snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); @@ -4323,7 +4329,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThrea tableSeq % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { - fprintf(stderr, "tag buf failed to allocate memory\n"); + errorPrint("%s() LN%d, tag buf failed to allocate memory\n", __func__, __LINE__); return -1; } @@ -4396,13 +4402,14 @@ static int generateDataBuffer(char *pTblName, } static void* syncWriteInterlace(threadInfo *pThreadInfo) { - printf("### CBD: interlace write\n"); + debugPrint("[%d] %s() LN%d: ### interlace write\n", + pThreadInfo->threadID, __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { - fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", + errorPrint( "Failed to alloc %d Bytes, reason:%s\n", superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, strerror(errno)); return NULL; @@ -4437,8 +4444,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int tableSeq = pThreadInfo->start_table_from; - debugPrint("%s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n", - __func__, __LINE__, pThreadInfo->start_table_from, + debugPrint("[%d] %s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from, pThreadInfo->ntables, insertRows); int64_t startTime = pThreadInfo->start_time; @@ -4446,80 +4453,111 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int batchPerTblTimes; int batchPerTbl; + assert(pThreadInfo->ntables > 0); + + if (rowsPerTbl > g_args.num_of_RPR) + rowsPerTbl = g_args.num_of_RPR; + + batchPerTbl = rowsPerTbl; if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) { - batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl; - batchPerTbl = rowsPerTbl; + batchPerTblTimes = + (g_args.num_of_RPR / (rowsPerTbl * pThreadInfo->ntables)) + 1; } else { batchPerTblTimes = 1; - batchPerTbl = g_args.num_of_RPR; } int generatedRecPerTbl = 0; + bool flagSleep = true; + int sleepTimeTotal = 0; while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { - if (insert_interval) { + if ((flagSleep) && (insert_interval)) { st = taosGetTimestampUs(); + flagSleep = false; } // generate data memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); char *pstr = buffer; - int recGenerated = 0; + int recOfBatch = 0; for (int i = 0; i < batchPerTblTimes; i ++) { getTableName(tableName, pThreadInfo, tableSeq); int headLen; if (i == 0) { - headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr); + headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, + superTblInfo, pstr); } else { - headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values", + headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values", pThreadInfo->db_name, tableName); } // generate data buffer - verbosePrint("%s() LN%d i=%d buffer:\n%s\n", - __func__, __LINE__, i, buffer); + verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n", + pThreadInfo->threadID, __func__, __LINE__, i, buffer); pstr += headLen; int dataLen = 0; - printf("%s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n", - __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); - int numOfRecGenerated = generateDataTail( - tableName, tableSeq, pThreadInfo, superTblInfo, - batchPerTbl, pstr, insertRows, 0, - startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep, - &(pThreadInfo->samplePos), &dataLen); - verbosePrint("%s() LN%d numOfRecGenerated= %d\n", - __func__, __LINE__, numOfRecGenerated); + debugPrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n", + pThreadInfo->threadID, __func__, __LINE__, + i, batchPerTblTimes, batchPerTbl); + generateDataTail( + tableName, tableSeq, pThreadInfo, superTblInfo, + batchPerTbl, pstr, insertRows, 0, + startTime + sleepTimeTotal + + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep, + &(pThreadInfo->samplePos), &dataLen); pstr += dataLen; - recGenerated += numOfRecGenerated; + recOfBatch += batchPerTbl; + pThreadInfo->totalInsertRows += batchPerTbl; + debugPrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl, recOfBatch); tableSeq ++; if (insertMode == INTERLACE_INSERT_MODE) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { // turn to first table tableSeq = pThreadInfo->start_table_from; - generatedRecPerTbl += numOfRecGenerated; + generatedRecPerTbl += batchPerTbl; + flagSleep = true; + if (generatedRecPerTbl >= insertRows) + break; + + if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) + break; } } int remainRows = insertRows - generatedRecPerTbl; - if (batchPerTbl > remainRows) + if ((remainRows > 0) && (batchPerTbl > remainRows)) batchPerTbl = remainRows; - if ((g_args.num_of_RPR - recGenerated) < batchPerTbl) + debugPrint("[%d] %s() LN%d generatedRecPerTbl=%d insertRows=%"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, + generatedRecPerTbl, insertRows); + + if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl) break; } - pThreadInfo->totalInsertRows += recGenerated; - printf("%s() LN%d recGenerated=%d totalInsertRows=%"PRId64" buffer:\n%s\n", - __func__, __LINE__, recGenerated, - pThreadInfo->totalInsertRows, buffer); - int affectedRows = execInsert(pThreadInfo, buffer, recGenerated); - if (affectedRows < 0) + debugPrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, recOfBatch, + pThreadInfo->totalInsertRows); + verbosePrint("[%d] %s() LN%d, buffer=%s\n", + pThreadInfo->threadID, __func__, __LINE__, buffer); + + int affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID, + __func__, __LINE__, affectedRows); + if (affectedRows < 0) { + errorPrint("[%d] %s() LN%d execInsert affected rows: %d\n%s\n", + pThreadInfo->threadID, __func__, __LINE__, + affectedRows, buffer); goto free_and_statistics_interlace; + } pThreadInfo->totalAffectedRows += affectedRows; @@ -4539,15 +4577,16 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { lastPrintTime = currentPrintTime; } - if (insert_interval) { - et = taosGetTimestampUs(); + if ((insert_interval) && flagSleep) { + et = taosGetTimestampUs(); - if (insert_interval > ((et - st)/1000) ) { - int sleep_time = insert_interval - (et -st)/1000; - verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", - __func__, __LINE__, sleep_time); - taosMsleep(sleep_time); // ms - } + if (insert_interval > ((et - st)/1000) ) { + int sleepTime = insert_interval - (et -st)/1000; +// verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", +// __func__, __LINE__, sleepTime); + taosMsleep(sleepTime); // ms + sleepTimeTotal += insert_interval; + } } } @@ -4570,12 +4609,13 @@ free_and_statistics_interlace: 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s */ static void* syncWriteProgressive(threadInfo *pThreadInfo) { + debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { - fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", + errorPrint( "Failed to alloc %d Bytes, reason:%s\n", superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, strerror(errno)); return NULL; @@ -4821,7 +4861,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } else if (0 == strncasecmp(precision, "us", 2)) { timePrec = TSDB_TIME_PRECISION_MICRO; } else { - fprintf(stderr, "No support precision: %s\n", precision); + errorPrint( "No support precision: %s\n", precision); exit(-1); } } @@ -4831,15 +4871,15 @@ static void startMultiThreadInsertData(int threads, char* db_name, if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { start_time = taosGetTimestamp(timePrec); } else { - if (TSDB_CODE_SUCCESS != taosParseTime( - superTblInfo->startTimestamp, - &start_time, - strlen(superTblInfo->startTimestamp), - timePrec, 0)) { - fprintf(stderr, "ERROR to parse time!\n"); - exit(-1); - } - } + if (TSDB_CODE_SUCCESS != taosParseTime( + superTblInfo->startTimestamp, + &start_time, + strlen(superTblInfo->startTimestamp), + timePrec, 0)) { + errorPrint("%s() LN%d, failed to parse time!\n", __func__, __LINE__); + exit(-1); + } + } } else { start_time = 1500000000000; } @@ -4857,7 +4897,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample")))) { if (0 != prepareSampleDataForSTable(superTblInfo)) { - fprintf(stderr, "prepare sample data for stable failed!\n"); + errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__); exit(-1); } } @@ -4869,15 +4909,15 @@ static void startMultiThreadInsertData(int threads, char* db_name, g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == taos) { - fprintf(stderr, "connect to server fail , reason: %s\n", - taos_errstr(NULL)); + errorPrint("%s() LN%d, connect to server fail , reason: %s\n", + __func__, __LINE__, taos_errstr(NULL)); exit(-1); } superTblInfo->childTblName = (char*)calloc(1, superTblInfo->childTblLimit * TSDB_TABLE_NAME_LEN); if (superTblInfo->childTblName == NULL) { - fprintf(stderr, "alloc memory failed!"); + errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); taos_close(taos); exit(-1); } @@ -4896,7 +4936,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample")))) { if (0 != prepareSampleDataForSTable(superTblInfo)) { - fprintf(stderr, "prepare sample data for stable failed!\n"); + errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__); exit(-1); } } @@ -4906,8 +4946,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == taos) { - fprintf(stderr, "connect to server fail , reason: %s\n", - taos_errstr(NULL)); + errorPrint("%s() LN%d, connect to server fail , reason: %s\n", + __func__, __LINE__, taos_errstr(NULL)); exit(-1); } @@ -4926,7 +4966,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, superTblInfo->childTblName = (char*)calloc(1, limit * TSDB_TABLE_NAME_LEN); if (superTblInfo->childTblName == NULL) { - fprintf(stderr, "alloc memory failed!"); + errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); taos_close(taos); exit(-1); } @@ -4957,7 +4997,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { - fprintf(stderr, "connect to server fail from insert sub thread, reason: %s\n", + errorPrint( "connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); exit(-1); } @@ -5001,6 +5041,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, tsem_destroy(&(t_info->lock_sem)); taos_close(t_info->taos); + debugPrint("%s() LN%d, [%d] totalInsert=%"PRId64" totalAffected=%"PRId64"\n", + __func__, __LINE__, + t_info->threadID, t_info->totalInsertRows, + t_info->totalAffectedRows); if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalInsertRows += t_info->totalInsertRows; @@ -5068,7 +5112,7 @@ void *readTable(void *sarg) { char *tb_prefix = rinfo->tb_prefix; FILE *fp = fopen(rinfo->fp, "a"); if (NULL == fp) { - fprintf(stderr, "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); + errorPrint( "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); return NULL; } @@ -5102,7 +5146,7 @@ void *readTable(void *sarg) { int32_t code = taos_errno(pSql); if (code != 0) { - fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql)); + errorPrint( "Failed to query:%s\n", taos_errstr(pSql)); taos_free_result(pSql); taos_close(taos); fclose(fp); @@ -5178,7 +5222,7 @@ void *readMetric(void *sarg) { int32_t code = taos_errno(pSql); if (code != 0) { - fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql)); + errorPrint( "Failed to query:%s\n", taos_errstr(pSql)); taos_free_result(pSql); taos_close(taos); fclose(fp); @@ -5216,7 +5260,7 @@ static int insertTestProcess() { debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); if (NULL == g_fpOfInsertResult) { - fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); + errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile); return -1; } @@ -5407,7 +5451,7 @@ static int queryTestProcess() { NULL, g_queryInfo.port); if (taos == NULL) { - fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); exit(-1); } @@ -5707,7 +5751,7 @@ static int subscribeTestProcess() { g_queryInfo.dbName, g_queryInfo.port); if (taos == NULL) { - fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); exit(-1); } @@ -6065,7 +6109,7 @@ static void queryResult() { g_Dbs.db[0].dbName, g_Dbs.port); if (rInfo->taos == NULL) { - fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); free(rInfo); exit(-1); } -- GitLab