提交 76a6b2ff 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: support insert internal instead of insert rate.

上级 e891ecc8
......@@ -35,7 +35,8 @@
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_interval": 0,
"num_of_records_per_req": 100,
"insert_rows": 100000,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 1,
......
......@@ -266,7 +266,8 @@ typedef struct SSuperTable_S {
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful
int insertInterval; // interval time between insert twice
uint32_t insertInterval; // interval time between insert twice
uint32_t numRecPerReq;
int multiThreadWriteOneTbl; // 0: no, 1: yes
int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl
......@@ -431,7 +432,7 @@ typedef struct SThreadInfo_S {
int start_table_id;
int end_table_id;
int data_of_rate;
int64_t start_time;
uint64_t start_time;
char* cols;
bool use_metric;
SSuperTable* superTblInfo;
......@@ -439,10 +440,9 @@ typedef struct SThreadInfo_S {
// for async insert
tsem_t lock_sem;
int64_t counter;
int64_t st;
int64_t et;
uint64_t st;
uint64_t et;
int64_t lastTs;
int nrecords_per_request;
// statistics
int64_t totalRowsInserted;
......@@ -458,6 +458,7 @@ typedef struct SThreadInfo_S {
} threadInfo;
#if 0
#ifdef LINUX
/* The options we understand. */
static struct argp_option options[] = {
......@@ -645,23 +646,213 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
}
#else
#endif
#endif
void printHelp() {
char indent[10] = " ";
printf("%s%s\n", indent, "-f");
printf("%s%s%s\n", indent, indent, "The meta file to the execution procedure. Default is './meta.json'.");
printf("%s%s%s\n", indent, indent,
"The meta file to the execution procedure. Default is './meta.json'.");
#ifdef _TD_POWER_
printf("%s%s\n", indent, "-c");
printf("%s%s%s\n", indent, indent, "config_directory, Configuration directory. Default is '/etc/taos/'.");
printf("%s%s%s\n", indent, indent,
"Configuration directory. Default is '/etc/power/'.");
printf("%s%s\n", indent, "-P");
printf("%s%s%s\n", indent, indent,
"The password to use when connecting to the server. Default is 'powerdb'.");
#else
printf("%s%s\n", indent, "-c");
printf("%s%s%s\n", indent, indent,
"Configuration directory. Default is '/etc/taos/'.");
printf("%s%s\n", indent, "-P");
printf("%s%s%s\n", indent, indent,
"The password to use when connecting to the server. Default is 'taosdata'.");
#endif
printf("%s%s\n", indent, "-h");
printf("%s%s%s\n", indent, indent,
"The host to connect to TDengine. Default is localhost.");
printf("%s%s\n", indent, "-p");
printf("%s%s%s\n", indent, indent,
"The TCP/IP port number to use for the connection. Default is 0.");
printf("%s%s\n", indent, "-u");
printf("%s%s%s\n", indent, indent,
"The TDengine user name to use when connecting to the server. Default is 'root'.");
printf("%s%s\n", indent, "-d");
printf("%s%s%s\n", indent, indent,
"Destination database. Default is 'test'.");
printf("%s%s\n", indent, "-a");
printf("%s%s%s\n", indent, indent,
"Set the replica parameters of the database, Default 1, min: 1, max: 3.");
printf("%s%s\n", indent, "-m");
printf("%s%s%s\n", indent, indent,
"Table prefix name. Default is 't'.");
printf("%s%s\n", indent, "-s");
printf("%s%s%s\n", indent, indent,
"The select sql file.");
printf("%s%s\n", indent, "-M");
printf("%s%s%s\n", indent, indent,
"Use metric flag.");
printf("%s%s\n", indent, "-o");
printf("%s%s%s\n", indent, indent,
"Direct output to the named file. Default is './output.txt'.");
printf("%s%s\n", indent, "-q");
printf("%s%s%s\n", indent, indent,
"Query mode--0: SYNC, 1: ASYNC. Default is SYNC.");
printf("%s%s\n", indent, "-b");
printf("%s%s%s\n", indent, indent,
"The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP.");
printf("%s%s\n", indent, "-w");
printf("%s%s%s\n", indent, indent,
"The length of data_type 'BINARY' or 'NCHAR'. Default is 16");
printf("%s%s\n", indent, "-l");
printf("%s%s%s\n", indent, indent,
"The number of columns per record. Default is 10.");
printf("%s%s\n", indent, "-T");
printf("%s%s%s\n", indent, indent,
"The number of threads. Default is 10.");
printf("%s%s\n", indent, "-r");
printf("%s%s%s\n", indent, indent,
"The number of records per request. Default is 100.");
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent,
"The number of tables. Default is 10000.");
printf("%s%s\n", indent, "-n");
printf("%s%s%s\n", indent, indent,
"The number of records per table. Default is 10000.");
printf("%s%s\n", indent, "-x");
printf("%s%s%s\n", indent, indent,
"Not insert only flag.");
printf("%s%s\n", indent, "-y");
printf("%s%s%s\n", indent, indent,
"Default input yes for prompt.");
printf("%s%s\n", indent, "-O");
printf("%s%s%s\n", indent, indent,
"Insert mode--0: In order, > 0: disorder ratio. Default is in order.");
printf("%s%s\n", indent, "-R");
printf("%s%s%s\n", indent, indent,
"Out of order data's range, ms, default is 1000.");
/* printf("%s%s\n", indent, "-D");
printf("%s%s%s\n", indent, indent,
"if elete database if exists. 0: no, 1: yes, default is 1");
*/
}
void parse_args(int argc, char *argv[], SArguments *arguments) {
char **sptr;
wordexp_t full_path;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-f") == 0) {
arguments->metaFile = argv[++i];
} else if (strcmp(argv[i], "-c") == 0) {
char *configPath = argv[++i];
if (wordexp(configPath, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", configPath);
return;
}
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
wordfree(&full_path);
} else if (strcmp(argv[i], "-h") == 0) {
arguments->host = argv[++i];
} else if (strcmp(argv[i], "-p") == 0) {
arguments->port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u") == 0) {
arguments->user = argv[++i];
} else if (strcmp(argv[i], "-P") == 0) {
arguments->password = argv[++i];
} else if (strcmp(argv[i], "-o") == 0) {
arguments->output_file = argv[++i];
} else if (strcmp(argv[i], "-s") == 0) {
arguments->sqlFile = argv[++i];
} else if (strcmp(argv[i], "-q") == 0) {
arguments->mode = atoi(argv[++i]);
} else if (strcmp(argv[i], "-T") == 0) {
arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
arguments->num_of_RPR = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t") == 0) {
arguments->num_of_tables = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
arguments->num_of_DPT = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0) {
arguments->database = argv[++i];
} else if (strcmp(argv[i], "-l") == 0) {
arguments->num_of_CPR = atoi(argv[++i]);
} else if (strcmp(argv[i], "-b") == 0) {
sptr = arguments->datatype;
++i;
if (strstr(argv[i], ",") == NULL) {
if (strcasecmp(argv[i], "INT") != 0 && strcasecmp(argv[i], "FLOAT") != 0 &&
strcasecmp(argv[i], "TINYINT") != 0 && strcasecmp(argv[i], "BOOL") != 0 &&
strcasecmp(argv[i], "SMALLINT") != 0 &&
strcasecmp(argv[i], "BIGINT") != 0 && strcasecmp(argv[i], "DOUBLE") != 0 &&
strcasecmp(argv[i], "BINARY") && strcasecmp(argv[i], "NCHAR")) {
fprintf(stderr, "Invalid data_type!\n");
printHelp();
exit(EXIT_FAILURE);
}
sptr[0] = argv[i];
} else {
int index = 0;
char *dupstr = strdup(argv[i]);
char *running = dupstr;
char *token = strsep(&running, ",");
while (token != NULL) {
if (strcasecmp(token, "INT") != 0 &&
strcasecmp(token, "FLOAT") != 0 &&
strcasecmp(token, "TINYINT") != 0 &&
strcasecmp(token, "BOOL") != 0 &&
strcasecmp(token, "SMALLINT") != 0 &&
strcasecmp(token, "BIGINT") != 0 &&
strcasecmp(token, "DOUBLE") != 0 && strcasecmp(token, "BINARY") && strcasecmp(token, "NCHAR")) {
fprintf(stderr, "Invalid data_type!\n");
printHelp();
exit(EXIT_FAILURE);
}
sptr[index++] = token;
token = strsep(&running, ",");
if (index >= MAX_NUM_DATATYPE) break;
}
}
} else if (strcmp(argv[i], "-w") == 0) {
arguments->len_of_binary = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0) {
arguments->tb_prefix = argv[++i];
} else if (strcmp(argv[i], "-M") == 0) {
arguments->use_metric = true;
} else if (strcmp(argv[i], "-x") == 0) {
arguments->insert_only = true;
} else if (strcmp(argv[i], "-y") == 0) {
arguments->answer_yes = true;
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) {
arguments->disorderRatio = atoi(argv[++i]);
if (arguments->disorderRatio > 1 || arguments->disorderRatio < 0) {
arguments->disorderRatio = 0;
} else if (arguments->disorderRatio == 1) {
arguments->disorderRange = 10;
}
} else if (strcmp(argv[i], "-R") == 0) {
arguments->disorderRange = atoi(argv[++i]);
if (arguments->disorderRange == 1
&& (arguments->disorderRange > 50
|| arguments->disorderRange <= 0)) {
arguments->disorderRange = 10;
}
} else if (strcmp(argv[i], "-a") == 0) {
arguments->replica = atoi(argv[++i]);
if (arguments->replica > 3 || arguments->replica < 1) {
arguments->replica = 1;
}
} else if (strcmp(argv[i], "-D") == 0) {
arguments->method_of_delete = atoi(argv[++i]);
if (arguments->method_of_delete < 0 || arguments->method_of_delete > 3) {
arguments->method_of_delete = 0;
}
} else if (strcmp(argv[i], "--help") == 0) {
printHelp();
exit(EXIT_FAILURE);
exit(0);
} else {
fprintf(stderr, "wrong options\n");
printHelp();
......@@ -669,7 +860,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
}
}
}
#endif
//#endif
static bool getInfoFromJsonFile(char* file);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
......@@ -1016,6 +1207,7 @@ static int printfInsertMeta() {
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
printf(" insertInterval: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].insertInterval);
printf(" numRecPerReq: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numRecPerReq);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
......@@ -1154,6 +1346,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertInterval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
fprintf(fp, " numRecPerReq: %d\n", g_Dbs.db[i].superTbls[j].numRecPerReq);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
......@@ -3003,10 +3196,21 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!insertInterval) {
g_Dbs.db[i].superTbls[j].insertInterval = 0;
} else {
printf("failed to read json, insert_rate not found");
printf("failed to read json, insert_interval not found");
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(stbInfo, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].numRecPerReq = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_Dbs.db[i].superTbls[j].numRecPerReq = 0;
} else {
printf("failed to read json, num_of_records_per_req not found");
goto PARSE_OVER;
}
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
......@@ -3570,9 +3774,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int inserted = i;
int k = 0;
int batchRowsSql = 0;
while (1)
for (int k = 0; k < winfo->superTblInfo->numRecPerReq;)
{
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
......@@ -3582,6 +3784,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
if (end_tbl_id > winfo->end_table_id) {
end_tbl_id = winfo->end_table_id+1;
}
for (tbl_id = tID; tbl_id < end_tbl_id; tbl_id++) {
sampleUsePos = samplePos;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
......@@ -3589,47 +3792,96 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(superTblInfo, tbl_id % superTblInfo->tagSampleCount);
tagsValBuf = getTagValueFromTagSample(
superTblInfo, tbl_id % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
goto free_and_statistics;
}
if (0 == len) {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
} else {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, " %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d using %s.%s tags %s values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s%d using %s.%s tags %s values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
}
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
if (0 == len) {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
} else {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, " %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s values ",
winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s values ",
winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
}
} else { // pre-create child table
if (0 == len) {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id);
} else {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, " %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s%d values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id);
}
}
tmp_time = time_counter;
for (k = 0; k < superTblInfo->rowsPerTbl;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) {
retLen = getRowDataFromSample(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo, &sampleUsePos, fp, sampleDataBuf);
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
superTblInfo,
&sampleUsePos,
fp,
sampleDataBuf);
if (retLen < 0) {
goto free_and_statistics;
}
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", 8)) {
} else if (0 == strncasecmp(
superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) {
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = tmp_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, d, superTblInfo);
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
} else {
retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo);
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
superTblInfo);
}
if (retLen < 0) {
goto free_and_statistics;
......@@ -3639,11 +3891,12 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
//inserted++;
k++;
totalRowsInserted++;
batchRowsSql++;
if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128) || batchRowsSql >= INT16_MAX - 1) {
if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tID = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", superTblInfo->lenOfOneRow);
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
superTblInfo->lenOfOneRow);
goto send_to_server;
}
}
......@@ -3654,15 +3907,17 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
inserted += superTblInfo->rowsPerTbl;
send_to_server:
batchRowsSql = 0;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
if (0 == strncasecmp(superTblInfo->insertMode,
"taosc",
strlen("taosc"))) {
//printf("multi table===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
int affectedRows = queryDbExec(
winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows) {
goto free_and_statistics;
} else {
......@@ -3678,7 +3933,10 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, totalRowsInserted, totalAffectedRows);
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
totalRowsInserted,
totalAffectedRows);
lastPrintTime = currentPrintTime;
}
//int64_t t2 = taosGetTimestampMs();
......@@ -3695,12 +3953,11 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
}
}
//printf("========tID:%d, k:%d, loop_cnt:%d\n", tID, k, loop_cnt);
break;
}
if (tID > winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos;
}
i = inserted;
......@@ -3731,9 +3988,9 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
void *syncWrite(void *sarg) {
int64_t totalRowsInserted = 0;
int64_t totalAffectedRows = 0;
int64_t lastPrintTime = taosGetTimestampMs();
uint64_t totalRowsInserted = 0;
uint64_t totalAffectedRows = 0;
uint64_t lastPrintTime = taosGetTimestampMs();
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
......@@ -3743,20 +4000,27 @@ void *syncWrite(void *sarg) {
int samplePos = 0;
// each thread read sample data from csv file
if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) {
sampleDataBuf = calloc(superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (0 == strncasecmp(superTblInfo->dataSource,
"sample",
strlen("sample"))) {
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
printf("Failed to calloc %d Bytes, reason:%s\n", superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno));
printf("Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return NULL;
}
fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
printf("Failed to open sample file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno));
printf("Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
tmfree(sampleDataBuf);
return NULL;
}
int ret = readSampleFromCsvFileToMem(fp, superTblInfo, sampleDataBuf);
int ret = readSampleFromCsvFileToMem(fp,
superTblInfo, sampleDataBuf);
if (0 != ret) {
tmfree(sampleDataBuf);
tmfclose(fp);
......@@ -3771,52 +4035,23 @@ void *syncWrite(void *sarg) {
return NULL;
}
//printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id);
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
int nrecords_per_request = 0;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
nrecords_per_request = (superTblInfo->maxSqlLen - 1280 - superTblInfo->lenOfTagOfOneRow) / superTblInfo->lenOfOneRow;
} else {
nrecords_per_request = (superTblInfo->maxSqlLen - 1280) / superTblInfo->lenOfOneRow;
}
int nrecords_no_last_req = nrecords_per_request;
int nrecords_last_req = 0;
int loop_cnt = 0;
if (0 != superTblInfo->insertRate) {
if (nrecords_no_last_req >= superTblInfo->insertRate) {
nrecords_no_last_req = superTblInfo->insertRate;
} else {
nrecords_last_req = superTblInfo->insertRate % nrecords_per_request;
loop_cnt = (superTblInfo->insertRate / nrecords_per_request) + (superTblInfo->insertRate % nrecords_per_request ? 1 : 0) ;
}
}
if (nrecords_no_last_req <= 0) {
nrecords_no_last_req = 1;
}
if (nrecords_no_last_req >= INT16_MAX) {
nrecords_no_last_req = INT16_MAX - 1;
}
if (nrecords_last_req >= INT16_MAX) {
nrecords_last_req = INT16_MAX - 1;
if (NULL == buffer) {
printf("Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->maxSqlLen,
strerror(errno));
tmfree(sampleDataBuf);
tmfclose(fp);
return NULL;
}
int nrecords_cur_req = nrecords_no_last_req;
int loop_cnt_orig = loop_cnt;
//printf("========nrecords_per_request:%d, nrecords_no_last_req:%d, nrecords_last_req:%d, loop_cnt:%d\n", nrecords_per_request, nrecords_no_last_req, nrecords_last_req, loop_cnt);
uint64_t time_counter = winfo->start_time;
uint64_t st = 0;
uint64_t et = 0;
int64_t time_counter = winfo->start_time;
int64_t st = 0;
int64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) {
if (superTblInfo->insertInterval && (superTblInfo->insertInterval > (et - st) )) {
if (i > 0 && superTblInfo->insertInterval
&& (superTblInfo->insertInterval > (et - st) )) {
taosMsleep(superTblInfo->insertInterval - (et - st)); // ms
}
......@@ -3824,9 +4059,9 @@ void *syncWrite(void *sarg) {
st = taosGetTimestampMs();
}
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int inserted = i;
int64_t tmp_time = time_counter;
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
uint64_t inserted = i;
uint64_t tmp_time = time_counter;
int sampleUsePos = samplePos;
int k = 0;
......@@ -3841,24 +4076,50 @@ void *syncWrite(void *sarg) {
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(superTblInfo, tID % superTblInfo->tagSampleCount);
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tID % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
goto free_and_statistics_2;
}
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d using %s.%s tags %s values", winfo->db_name, superTblInfo->childTblPrefix, tID, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d using %s.%s tags %s values",
winfo->db_name,
superTblInfo->childTblPrefix,
tID,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values", winfo->db_name, superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s values",
winfo->db_name,
superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN);
} else {
len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values", winfo->db_name, superTblInfo->childTblPrefix, tID);
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values",
winfo->db_name,
superTblInfo->childTblPrefix,
tID);
}
for (k = 0; k < nrecords_cur_req;) {
for (k = 0; k < superTblInfo->numRecPerReq;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) {
retLen = getRowDataFromSample(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo, &sampleUsePos, fp, sampleDataBuf);
retLen = getRowDataFromSample(
pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
superTblInfo,
&sampleUsePos,
fp,
sampleDataBuf);
if (retLen < 0) {
goto free_and_statistics_2;
}
......@@ -3866,10 +4127,17 @@ void *syncWrite(void *sarg) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) {
int64_t d = tmp_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, d, superTblInfo);
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len, d,
superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
} else {
retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo);
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
superTblInfo);
}
if (retLen < 0) {
goto free_and_statistics_2;
......@@ -3880,7 +4148,9 @@ void *syncWrite(void *sarg) {
k++;
totalRowsInserted++;
if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) break;
if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break;
}
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
......@@ -3906,7 +4176,10 @@ void *syncWrite(void *sarg) {
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, totalRowsInserted, totalAffectedRows);
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
totalRowsInserted,
totalAffectedRows);
lastPrintTime = currentPrintTime;
}
//int64_t t2 = taosGetTimestampMs();
......@@ -3922,25 +4195,11 @@ void *syncWrite(void *sarg) {
goto free_and_statistics_2;
}
}
//printf("========tID:%d, k:%d, loop_cnt:%d\n", tID, k, loop_cnt);
if (loop_cnt) {
loop_cnt--;
if ((1 == loop_cnt) && (0 != nrecords_last_req)) {
nrecords_cur_req = nrecords_last_req;
} else if (0 == loop_cnt){
nrecords_cur_req = nrecords_no_last_req;
loop_cnt = loop_cnt_orig;
break;
}
} else {
break;
}
}
if (tID == winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) {
if (0 == strncasecmp(
superTblInfo->dataSource, "sample", 6)) {
samplePos = sampleUsePos;
}
i = inserted;
......@@ -3954,7 +4213,7 @@ void *syncWrite(void *sarg) {
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
free_and_statistics_2:
free_and_statistics_2:
tmfree(buffer);
tmfree(sampleDataBuf);
tmfclose(fp);
......@@ -3962,7 +4221,10 @@ void *syncWrite(void *sarg) {
winfo->totalRowsInserted = totalRowsInserted;
winfo->totalAffectedRows = totalAffectedRows;
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, totalRowsInserted, totalAffectedRows);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
totalRowsInserted,
totalAffectedRows);
return NULL;
}
......@@ -3992,7 +4254,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
return;
}
for (int i = 0; i < winfo->nrecords_per_request; i++) {
for (int i = 0; i < winfo->superTblInfo->numRecPerReq; i++) {
int rand_num = rand() % 100;
if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio)
{
......@@ -4024,31 +4286,6 @@ void callBack(void *param, TAOS_RES *res, int code) {
void *asyncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
winfo->nrecords_per_request = 0;
//if (AUTO_CREATE_SUBTBL == winfo->superTblInfo->autoCreateTable) {
winfo->nrecords_per_request = (winfo->superTblInfo->maxSqlLen - 1280 - winfo->superTblInfo->lenOfTagOfOneRow) / winfo->superTblInfo->lenOfOneRow;
//} else {
// winfo->nrecords_per_request = (winfo->superTblInfo->maxSqlLen - 1280) / winfo->superTblInfo->lenOfOneRow;
//}
if (0 != winfo->superTblInfo->insertInterval) {
if (winfo->nrecords_per_request >= winfo->superTblInfo->insertInterval) {
winfo->nrecords_per_request = winfo->superTblInfo->insertInterval;
}
}
if (winfo->nrecords_per_request <= 0) {
winfo->nrecords_per_request = 1;
}
if (winfo->nrecords_per_request >= INT16_MAX) {
winfo->nrecords_per_request = INT16_MAX - 1;
}
if (winfo->nrecords_per_request >= INT16_MAX) {
winfo->nrecords_per_request = INT16_MAX - 1;
}
winfo->st = 0;
winfo->et = 0;
winfo->lastTs = winfo->start_time;
......@@ -4950,6 +5187,7 @@ void setParaFromArg(){
g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].insertInterval = 0;
g_Dbs.db[0].superTbls[0].numRecPerReq = 0;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册