From 42e8a73f91bf39ecd9c3260442972dee079575e2 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 5 Mar 2021 18:26:24 +0800 Subject: [PATCH] [TD-3147] : support insert interval. stb case passed. --- src/kit/taosdemo/taosdemo.c | 171 +++++++++++++++++++++++------------- 1 file changed, 112 insertions(+), 59 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 80cbecb96b..30370cd797 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -2061,7 +2061,7 @@ static int createDatabases() { for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].drop) { sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); return -1; @@ -2129,7 +2129,7 @@ static int createDatabases() { "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); @@ -2141,7 +2141,7 @@ static int createDatabases() { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); @@ -2244,7 +2244,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -3777,14 +3777,6 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; for (int i = 0; i < superTblInfo->insertRows;) { - if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { - taosMsleep(g_args.insert_interval - (et - st)); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } - int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; @@ -3921,6 +3913,16 @@ static void syncWriteForNumberOfTblInOneSql( inserted += superTblInfo->rowsPerTbl; send_to_server: + if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { + int sleep_time = g_args.insert_interval - (et -st); + debugPrint("DEBUG sleep: %d ms\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { @@ -3930,9 +3932,10 @@ send_to_server: int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec( winfo->taos, buffer, INSERT_TYPE); + if (0 > affectedRows) { goto free_and_statistics; } else { @@ -3967,7 +3970,10 @@ send_to_server: goto free_and_statistics; } } - + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } + break; } @@ -3980,13 +3986,10 @@ send_to_server: } } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); } - free_and_statistics: +free_and_statistics: tmfree(buffer); winfo->totalRowsInserted = totalRowsInserted; winfo->totalAffectedRows = totalAffectedRows; @@ -4080,24 +4083,33 @@ static void* syncWrite(void *sarg) { srand((uint32_t)time(NULL)); int64_t time_counter = winfo->start_time; + uint64_t st = 0; + uint64_t et = 0; + for (int i = 0; i < g_args.num_of_DPT;) { + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { int inserted = i; int64_t tmp_time = time_counter; char *pstr = buffer; - pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, g_args.tb_prefix, tID); + pstr += sprintf(pstr, + "insert into %s.%s%d values", + winfo->db_name, g_args.tb_prefix, tID); int k; for (k = 0; k < g_args.num_of_RPR;) { int rand_num = rand() % 100; int len = -1; - if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) { + if ((g_args.disorderRatio != 0) + && (rand_num < g_args.disorderRange)) { int64_t d = tmp_time - rand() % 1000000 + rand_num; - len = generateData(data, data_type, ncols_per_record, d, len_of_binary); + len = generateData(data, data_type, + ncols_per_record, d, len_of_binary); } else { - len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary); + len = generateData(data, data_type, + ncols_per_record, tmp_time += 1000, len_of_binary); } //assert(len + pstr - buffer < BUFFER_SIZE); @@ -4118,24 +4130,41 @@ static void* syncWrite(void *sarg) { int64_t endTs; startTs = taosGetTimestampUs(); //queryDB(winfo->taos, buffer); + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + debugPrint("DEBUG sleep: %d ms\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); if (0 <= affectedRows){ endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; - if (delay > winfo->maxDelay) winfo->maxDelay = delay; - if (delay < winfo->minDelay) winfo->minDelay = delay; + if (delay > winfo->maxDelay) + winfo->maxDelay = delay; + if (delay < winfo->minDelay) + winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; - } + } + + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } if (tID == winfo->end_table_id) { i = inserted; time_counter = tmp_time; } } + } return NULL; } @@ -4199,19 +4228,12 @@ static void* syncWriteWithStb(void *sarg) { return NULL; } - uint64_t time_counter = winfo->start_time; + int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; + debugPrint("DEBUG - %s() LN%d insertRows=%ld\n", __func__, __LINE__, superTblInfo->insertRows); for (int i = 0; i < superTblInfo->insertRows;) { - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - taosMsleep(g_args.insert_interval - (et - st)); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { uint64_t inserted = i; @@ -4219,8 +4241,8 @@ static void* syncWriteWithStb(void *sarg) { int sampleUsePos = samplePos; int k = 0; - while (1) - { + debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + for (k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -4263,9 +4285,8 @@ static void* syncWriteWithStb(void *sarg) { tID); } - for (k = 0; k < g_args.num_of_RPR;) { - int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { + int retLen = 0; + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, @@ -4277,7 +4298,7 @@ static void* syncWriteWithStb(void *sarg) { if (retLen < 0) { goto free_and_statistics_2; } - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { @@ -4287,7 +4308,7 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->maxSqlLen - len, d, superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); - } else { + } else { retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, @@ -4297,17 +4318,31 @@ static void* syncWriteWithStb(void *sarg) { if (retLen < 0) { goto free_and_statistics_2; } - } - len += retLen; - inserted++; - k++; - totalRowsInserted++; + } +/* len += retLen; +*/ + inserted++; + k++; + totalRowsInserted++; - if (inserted >= superTblInfo->insertRows + debugPrint("DEBUG %s() LN%d inserted=%ld k=%d totalRowsInserted=%ld superTblInfo->insertRows=%ld\n", __func__, __LINE__, inserted, k, totalRowsInserted, superTblInfo->insertRows); + if (inserted > superTblInfo->insertRows) + break; +/* if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) break; +*/ + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + debugPrint("DEBUG sleep: %d ms\n", sleep_time); + taosMsleep(sleep_time); // ms } - + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); //int64_t t1 = taosGetTimestampMs(); @@ -4317,6 +4352,7 @@ static void* syncWriteWithStb(void *sarg) { debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); + if (0 > affectedRows){ goto free_and_statistics_2; } else { @@ -4351,6 +4387,23 @@ static void* syncWriteWithStb(void *sarg) { goto free_and_statistics_2; } } + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } +/* + if (loop_cnt) { + loop_cnt--; + if ((1 == loop_cnt) && (0 != nrecords_last_req)) { + nrecords_cur_req = nrecords_last_req; + } else if (0 == loop_cnt){ + nrecords_cur_req = nrecords_no_last_req; + loop_cnt = loop_cnt_orig; + break; + } + } else { + break; + } + */ } if (tID == winfo->end_table_id) { @@ -4358,14 +4411,12 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } + i = inserted; time_counter = tmp_time; } } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); } @@ -4502,7 +4553,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, } } - int64_t start_time; if (superTblInfo) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { @@ -4530,20 +4580,23 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, t_info->start_time = start_time; t_info->minDelay = INT16_MAX; - if ((NULL == superTblInfo) || (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { + if ((NULL == superTblInfo) || + (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { //t_info->taos = taos; t_info->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { - printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); + printf("connect to server fail from insert sub thread, reason: %s\n", + taos_errstr(NULL)); exit(-1); } } else { t_info->taos = NULL; } - if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { + if ((NULL == superTblInfo) + || (0 == superTblInfo->multiThreadWriteOneTbl)) { t_info->start_table_id = last; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; @@ -5012,7 +5065,7 @@ static int queryTestProcess() { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); } else { t_info->taos = NULL; @@ -5123,7 +5176,7 @@ void *subSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ return NULL; } @@ -5189,7 +5242,7 @@ void *superSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { return NULL; } @@ -5554,7 +5607,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) } memcpy(cmd + cmd_len, line, read_len); - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, cmd); + debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); queryDbExec(taos, cmd, NO_INSERT_TYPE); memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; -- GitLab