diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 0ac8269b029403727cf8a89beffdfe9b61f511e7..8bc0ff7f121ba6726e900bc0e0d45d02ad3bef65 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -228,6 +228,7 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // + int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -1085,6 +1086,7 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); + printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1231,6 +1233,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -1991,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, exit(-1); } snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); - debugPrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2044,7 +2047,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("%s() LN%d: %s\n", __func__, __LINE__, command); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); @@ -2237,7 +2240,7 @@ static void* createTable(void *sarg) } len = 0; - debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2252,7 +2255,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -3228,6 +3231,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } + + cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); + if (insertRows && insertRows->type == cJSON_Number) { + g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; + //if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + // g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; + //} + } else if (!insertRows) { + g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; + } else { + printf("failed to read json, insert_rows not found"); + goto PARSE_OVER; + } + if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3771,7 +3788,7 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; - for (int i = 0; i < g_args.num_of_RPR;) { + for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; @@ -3894,7 +3911,7 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - if (inserted >= g_args.num_of_RPR || + if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", @@ -4080,10 +4097,13 @@ static void* syncWrite(void *sarg) { uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_DPT;) { + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int inserted = i; + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + for (int i = 0; i < g_args.num_of_DPT;) { + + int tblInserted = i; int64_t tmp_time = time_counter; char *pstr = buffer; @@ -4112,13 +4132,15 @@ static void* syncWrite(void *sarg) { } pstr += sprintf(pstr, " %s", data); - inserted++; + tblInserted++; k++; + i++; - if (inserted >= g_args.num_of_DPT) + if (tblInserted >= g_args.num_of_DPT) break; } + winfo->totalRowsInserted += k; /* puts(buffer); */ int64_t startTs; int64_t endTs; @@ -4134,9 +4156,10 @@ static void* syncWrite(void *sarg) { if (g_args.insert_interval) { st = taosGetTimestampMs(); } - debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); - + + verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows); if (0 <= affectedRows){ endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; @@ -4146,20 +4169,26 @@ static void* syncWrite(void *sarg) { winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } + verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); if (g_args.insert_interval) { et = taosGetTimestampMs(); } - if (tID == winfo->end_table_id) { - i = inserted; - time_counter = tmp_time; - } - } + if (tblInserted >= g_args.num_of_DPT) { + break; + } + } // num_of_DPT + } // tId + + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + winfo->threadID, + winfo->totalRowsInserted, + winfo->totalAffectedRows); - } return NULL; } @@ -4225,11 +4254,14 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_RPR;) { - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + superTblInfo->insertRows); + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t inserted = 0; + for (int i = 0; i < superTblInfo->insertRows;) { + + int64_t inserted = i; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4244,9 +4276,8 @@ static void* syncWriteWithStb(void *sarg) { } int sampleUsePos = samplePos; - int k = 0; verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { + for (int k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -4329,10 +4360,14 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; - debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); + verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - if (inserted > g_args.num_of_RPR) + if (inserted > superTblInfo->insertRows) break; +/* if (inserted >= superTblInfo->insertRows + || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) + break; +*/ if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); @@ -4440,6 +4475,7 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); +// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; @@ -4466,7 +4502,7 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; - if (winfo->counter >= g_args.num_of_RPR) { + if (winfo->counter >= winfo->superTblInfo->insertRows) { break; } } @@ -4631,12 +4667,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalRowsInserted += t_info->totalRowsInserted; - - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } + + totalDelay += t_info->totalDelay; + cntDelay += t_info->cntDelay; + if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } cntDelay -= 1; @@ -4684,7 +4720,13 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT = g_args.num_of_DPT; + int num_of_DPT; +/* if (rinfo->superTblInfo) { + num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + } else { + */ + num_of_DPT = g_args.num_of_DPT; +// } int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4747,7 +4789,7 @@ void *readMetric(void *sarg) { return NULL; } - int num_of_DPT = g_args.num_of_DPT; + int num_of_DPT = rinfo->superTblInfo->insertRows; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4866,7 +4908,7 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_args.num_of_DPT) { + if (0 == g_Dbs.db[i].superTbls[j].insertRows) { continue; } startMultiThreadInsertData( @@ -5491,6 +5533,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0;