提交 e437f5c8 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: support insert interval. json works.

上级 15418181
......@@ -9,6 +9,8 @@
"thread_count_create_tbl": 1,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"databases": [{
"dbinfo": {
"name": "db",
......@@ -35,8 +37,6 @@
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_interval": 0,
"num_of_records_per_req": 100,
"insert_rows": 100000,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 1,
......
......@@ -61,56 +61,6 @@
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
#ifdef WINDOWS
#include <windows.h>
// Some old MinGW/CYGWIN distributions don't define this:
#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004
#endif
static HANDLE g_stdoutHandle;
static DWORD g_consoleMode;
void setupForAnsiEscape(void) {
DWORD mode = 0;
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
if(g_stdoutHandle == INVALID_HANDLE_VALUE) {
exit(GetLastError());
}
if(!GetConsoleMode(g_stdoutHandle, &mode)) {
exit(GetLastError());
}
g_consoleMode = mode;
// Enable ANSI escape codes
mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
if(!SetConsoleMode(g_stdoutHandle, mode)) {
exit(GetLastError());
}
}
void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
// Reset console mode
if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
exit(GetLastError());
}
}
#else
void setupForAnsiEscape(void) {}
void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
}
#endif
extern char configDir[];
#define INSERT_JSON_NAME "insert.json"
......@@ -163,7 +113,7 @@ enum MODE {
ASYNC,
MODE_BUT
};
enum QUERY_TYPE {
NO_INSERT_TYPE,
INSERT_TYPE,
......@@ -233,6 +183,7 @@ typedef struct SArguments_S {
bool use_metric;
bool insert_only;
bool answer_yes;
bool debug_print;
char * output_file;
int mode;
char * datatype[MAX_NUM_DATATYPE + 1];
......@@ -267,8 +218,6 @@ typedef struct SSuperTable_S {
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful
uint32_t insertInterval; // interval time between insert twice
uint32_t numRecPerReq;
int multiThreadWriteOneTbl; // 0: no, 1: yes
int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl
......@@ -374,6 +323,9 @@ typedef struct SDbs_S {
int dbCount;
SDataBase db[MAX_DB_COUNT];
int insert_interval;
int num_of_RPR;
// statistics
int64_t totalRowsInserted;
int64_t totalAffectedRows;
......@@ -458,6 +410,125 @@ typedef struct SThreadInfo_S {
} threadInfo;
#ifdef WINDOWS
#include <windows.h>
// Some old MinGW/CYGWIN distributions don't define this:
#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004
#endif
static HANDLE g_stdoutHandle;
static DWORD g_consoleMode;
void setupForAnsiEscape(void) {
DWORD mode = 0;
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
if(g_stdoutHandle == INVALID_HANDLE_VALUE) {
exit(GetLastError());
}
if(!GetConsoleMode(g_stdoutHandle, &mode)) {
exit(GetLastError());
}
g_consoleMode = mode;
// Enable ANSI escape codes
mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
if(!SetConsoleMode(g_stdoutHandle, mode)) {
exit(GetLastError());
}
}
void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
// Reset console mode
if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
exit(GetLastError());
}
}
#else
void setupForAnsiEscape(void) {}
void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
}
#endif
static int createDatabases();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, int type);
/* ************ Global variables ************ */
int32_t randint[MAX_PREPARED_RAND];
int64_t randbigint[MAX_PREPARED_RAND];
float randfloat[MAX_PREPARED_RAND];
double randdouble[MAX_PREPARED_RAND];
char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
"max(col0)", "min(col0)", "first(col0)", "last(col0)"};
SArguments g_args = {NULL,
"127.0.0.1", // host
6030, // port
"root", // user
#ifdef _TD_POWER_
"powerdb", // password
#else
"taosdata", // password
#endif
"test", // database
1, // replica
"t", // tb_prefix
NULL, // sqlFile
false, // use_metric
false, // insert_only
false, // debug_print
false, // answer_yes;
"./output.txt", // output_file
0, // mode : sync or async
{
"TINYINT", // datatype
"SMALLINT",
"INT",
"BIGINT",
"FLOAT",
"DOUBLE",
"BINARY",
"NCHAR",
"BOOL",
"TIMESTAMP"
},
16, // len_of_binary
10, // num_of_CPR
10, // num_of_connections/thread
0, // insert_interval
100, // num_of_RPR
10000, // num_of_tables
10000, // num_of_DPT
0, // abort
0, // disorderRatio
1000, // disorderRange
1, // method_of_delete
NULL // arg_list
};
static int g_jsonType = 0;
static SDbs g_Dbs;
static int g_totalChildTables = 0;
static SQueryMetaInfo g_queryInfo;
static FILE * g_fpOfInsertResult = NULL;
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0)
///////////////////////////////////////////////////
void printHelp() {
char indent[10] = " ";
......@@ -514,6 +585,8 @@ void printHelp() {
"Insert mode--0: In order, > 0: disorder ratio. Default is in order.");
printf("%s%s%s%s\n", indent, "-R", indent,
"Out of order data's range, ms, default is 1000.");
printf("%s%s%s%s\n", indent, "-g", indent,
"Print debug info.");
/* printf("%s%s%s%s\n", indent, "-D", indent,
"if elete database if exists. 0: no, 1: yes, default is 1");
*/
......@@ -614,6 +687,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->insert_only = true;
} else if (strcmp(argv[i], "-y") == 0) {
arguments->answer_yes = true;
} else if (strcmp(argv[i], "-g") == 0) {
arguments->debug_print = true;
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) {
......@@ -651,77 +726,42 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE);
}
}
if (arguments->debug_print) {
printf("###################################################################\n");
printf("# Server IP: %s:%hu\n",
arguments->host == NULL ? "localhost" : arguments->host,
arguments->port );
printf("# User: %s\n", arguments->user);
printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false");
printf("# Insertion interval: %d\n", arguments->insert_interval);
printf("# Number of Columns per record: %d\n", arguments->num_of_RPR);
printf("# Number of Threads: %d\n", arguments->num_of_threads);
printf("# Number of Tables: %d\n", arguments->num_of_tables);
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
printf("# Database name: %s\n", arguments->database);
printf("# Table prefix: %s\n", arguments->tb_prefix);
if (arguments->disorderRatio) {
printf("# Data order: %d\n", arguments->disorderRatio);
printf("# Data out of order rate: %d\n", arguments->disorderRange);
}
printf("# Delete method: %d\n", arguments->method_of_delete);
printf("# Answer yes when prompt: %d\n", arguments->answer_yes);
printf("# Print debug info: %d\n", arguments->debug_print);
printf("###################################################################\n");
if (!arguments->answer_yes) {
printf("Press enter key to continue\n\n");
(void) getchar();
}
}
}
static bool getInfoFromJsonFile(char* file);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
static void init_rand_data();
static int createDatabases();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, int type);
/* ************ Global variables ************ */
int32_t randint[MAX_PREPARED_RAND];
int64_t randbigint[MAX_PREPARED_RAND];
float randfloat[MAX_PREPARED_RAND];
double randdouble[MAX_PREPARED_RAND];
char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
"max(col0)", "min(col0)", "first(col0)", "last(col0)"};
SArguments g_args = {NULL,
"127.0.0.1", // host
6030, // port
"root", // user
#ifdef _TD_POWER_
"powerdb", // password
#else
"taosdata", // password
#endif
"test", // database
1, // replica
"t", // tb_prefix
NULL, // sqlFile
false, // use_metric
false, // insert_only
false, // answer_yes;
"./output.txt", // output_file
0, // mode : sync or async
{
"TINYINT", // datatype
"SMALLINT",
"INT",
"BIGINT",
"FLOAT",
"DOUBLE",
"BINARY",
"NCHAR",
"BOOL",
"TIMESTAMP"
},
16, // len_of_binary
10, // num_of_CPR
10, // num_of_connections/thread
0, // insert_interval
100, // num_of_RPR
10000, // num_of_tables
10000, // num_of_DPT
0, // abort
0, // disorderRatio
1000, // disorderRange
1, // method_of_delete
NULL // arg_list
};
static int g_jsonType = 0;
static SDbs g_Dbs;
static int g_totalChildTables = 0;
static SQueryMetaInfo g_queryInfo;
static FILE * g_fpOfInsertResult = NULL;
void tmfclose(FILE *fp) {
if (NULL != fp) {
fclose(fp);
......@@ -917,6 +957,8 @@ static int printfInsertMeta() {
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("insert interval: \033[33m%d\033[0m\n", g_Dbs.insert_interval);
printf("number of records per req: \033[33m%d\033[0m\n", g_Dbs.num_of_RPR);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) {
......@@ -999,8 +1041,6 @@ static int printfInsertMeta() {
printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
printf(" insertInterval: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].insertInterval);
printf(" numRecPerReq: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numRecPerReq);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
......@@ -1138,8 +1178,6 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertInterval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
fprintf(fp, " numRecPerReq: %d\n", g_Dbs.db[i].superTbls[j].numRecPerReq);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
......@@ -2530,6 +2568,26 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.insert_interval = insertInterval->valueint;
} else if (!insertInterval) {
g_Dbs.insert_interval = 0;
} else {
printf("failed to read json, insert_interval not found");
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_Dbs.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_Dbs.num_of_RPR = 0;
} else {
printf("failed to read json, num_of_records_per_req not found");
goto PARSE_OVER;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
......@@ -2983,26 +3041,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
} else if (!insertInterval) {
g_Dbs.db[i].superTbls[j].insertInterval = 0;
} else {
printf("failed to read json, insert_interval not found");
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(stbInfo, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].numRecPerReq = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_Dbs.db[i].superTbls[j].numRecPerReq = 0;
} else {
printf("failed to read json, num_of_records_per_req not found");
goto PARSE_OVER;
}
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
......@@ -3414,7 +3452,7 @@ static bool getInfoFromJsonFile(char* file) {
} else {
printf("input json file type error! please input correct file type: insert or query or subscribe\n");
goto PARSE_OVER;
}
}
PARSE_OVER:
free(content);
......@@ -3423,7 +3461,6 @@ PARSE_OVER:
return ret;
}
void prePareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
......@@ -3526,7 +3563,8 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
return dataLen;
}
void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDataBuf) {
void syncWriteForNumberOfTblInOneSql(
threadInfo *winfo, FILE *fp, char* sampleDataBuf) {
SSuperTable* superTblInfo = winfo->superTblInfo;
int samplePos = 0;
......@@ -3555,11 +3593,11 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
int64_t st = 0;
int64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) {
if (superTblInfo->insertInterval && (superTblInfo->insertInterval > (et - st))) {
taosMsleep(superTblInfo->insertInterval - (et - st)); // ms
if (g_Dbs.insert_interval && (g_Dbs.insert_interval > (et - st))) {
taosMsleep(g_Dbs.insert_interval - (et - st)); // ms
}
if (superTblInfo->insertInterval) {
if (g_Dbs.insert_interval) {
st = taosGetTimestampMs();
}
......@@ -3567,8 +3605,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int inserted = i;
for (int k = 0; k < winfo->superTblInfo->numRecPerReq;)
{
for (int k = 0; k < g_Dbs.num_of_RPR;) {
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer;
......@@ -3649,7 +3686,8 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
tmp_time = time_counter;
for (k = 0; k < superTblInfo->rowsPerTbl;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
......@@ -3684,7 +3722,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
//inserted++;
k++;
totalRowsInserted++;
if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tID = tbl_id + 1;
......@@ -3693,13 +3731,12 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
goto send_to_server;
}
}
}
tID = tbl_id;
inserted += superTblInfo->rowsPerTbl;
send_to_server:
send_to_server:
if (0 == strncasecmp(superTblInfo->insertMode,
"taosc",
strlen("taosc"))) {
......@@ -3758,7 +3795,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
}
}
if (superTblInfo->insertInterval) {
if (g_Dbs.insert_interval) {
et = taosGetTimestampMs();
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
......@@ -3843,12 +3880,12 @@ void *syncWrite(void *sarg) {
uint64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) {
if (i > 0 && superTblInfo->insertInterval
&& (superTblInfo->insertInterval > (et - st) )) {
taosMsleep(superTblInfo->insertInterval - (et - st)); // ms
if (i > 0 && g_Dbs.insert_interval
&& (g_Dbs.insert_interval > (et - st) )) {
taosMsleep(g_Dbs.insert_interval - (et - st)); // ms
}
if (superTblInfo->insertInterval) {
if (g_Dbs.insert_interval) {
st = taosGetTimestampMs();
}
......@@ -3901,10 +3938,10 @@ void *syncWrite(void *sarg) {
superTblInfo->childTblPrefix,
tID);
}
for (k = 0; k < superTblInfo->numRecPerReq;) {
for (k = 0; k < g_Dbs.num_of_RPR;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample(
pstr + len,
superTblInfo->maxSqlLen - len,
......@@ -3916,9 +3953,10 @@ void *syncWrite(void *sarg) {
if (retLen < 0) {
goto free_and_statistics_2;
}
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", 8)) {
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) {
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = tmp_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(
pstr + len,
......@@ -3946,7 +3984,7 @@ void *syncWrite(void *sarg) {
break;
}
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
//printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t startTs;
......@@ -3975,7 +4013,7 @@ void *syncWrite(void *sarg) {
totalAffectedRows);
lastPrintTime = currentPrintTime;
}
//int64_t t2 = taosGetTimestampMs();
//int64_t t2 = taosGetTimestampMs();
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
} else {
//int64_t t1 = taosGetTimestampMs();
......@@ -3992,15 +4030,15 @@ void *syncWrite(void *sarg) {
if (tID == winfo->end_table_id) {
if (0 == strncasecmp(
superTblInfo->dataSource, "sample", 6)) {
superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos;
}
i = inserted;
time_counter = tmp_time;
}
}
if (superTblInfo->insertInterval) {
}
if (g_Dbs.insert_interval) {
et = taosGetTimestampMs();
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
......@@ -4024,7 +4062,7 @@ free_and_statistics_2:
void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param;
if (winfo->superTblInfo->insertInterval) {
if (g_Dbs.insert_interval) {
winfo->et = taosGetTimestampMs();
if (winfo->et - winfo->st < 1000) {
taosMsleep(1000 - (winfo->et - winfo->st)); // ms
......@@ -4047,7 +4085,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
return;
}
for (int i = 0; i < winfo->superTblInfo->numRecPerReq; i++) {
for (int i = 0; i < g_Dbs.num_of_RPR; i++) {
int rand_num = rand() % 100;
if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio)
{
......@@ -4066,7 +4104,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
}
}
if (winfo->superTblInfo->insertInterval) {
if (g_Dbs.insert_interval) {
winfo->st = taosGetTimestampMs();
}
taos_query_a(winfo->taos, buffer, callBack, winfo);
......@@ -4083,7 +4121,7 @@ void *asyncWrite(void *sarg) {
winfo->et = 0;
winfo->lastTs = winfo->start_time;
if (winfo->superTblInfo->insertInterval) {
if (g_Dbs.insert_interval) {
winfo->st = taosGetTimestampMs();
}
taos_query_a(winfo->taos, "show databases", callBack, winfo);
......@@ -4136,7 +4174,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec);
} else {
(void)taosParseTime(superTblInfo->startTimestamp, &start_time, strlen(superTblInfo->startTimestamp), timePrec, 0);
(void)taosParseTime(
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0);
}
double start = getCurrentTime();
......@@ -4153,7 +4195,9 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
//t_info->taos = taos;
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
exit(-1);
......@@ -4173,7 +4217,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
}
tsem_init(&(t_info->lock_sem), 0, 0);
if (SYNC == g_Dbs.queryMode) {
pthread_create(pids + i, NULL, syncWrite, t_info);
} else {
......@@ -4217,7 +4260,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t);
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n",
......@@ -4406,10 +4448,13 @@ int insertTestProcess() {
createChildTables();
end = getCurrentTime();
if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount);
fprintf(g_fpOfInsertResult, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount);
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount);
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount);
}
taosMsleep(1000);
// create sub threads for inserting data
......@@ -4420,11 +4465,15 @@ int insertTestProcess() {
if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
continue;
}
startMultiThreadInsertData(g_Dbs.threadCount, g_Dbs.db[i].dbName, g_Dbs.db[i].dbCfg.precision, superTblInfo);
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
}
}
//end = getCurrentTime();
//int64_t totalRowsInserted = 0;
//int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) {
......@@ -4443,7 +4492,12 @@ int insertTestProcess() {
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
//rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
rInfo->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, g_Dbs.db[0].dbName, g_Dbs.port);
rInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
strcpy(rInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix);
strcpy(rInfo->fp, g_Dbs.resultFile);
......@@ -4486,12 +4540,15 @@ void *superQueryProcess(void *sarg) {
}
selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile);
int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else {
int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]);
int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]);
int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", winfo->threadID);
......@@ -4500,7 +4557,8 @@ void *superQueryProcess(void *sarg) {
}
}
et = taosGetTimestampMs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0);
}
return NULL;
}
......@@ -4508,7 +4566,9 @@ void *superQueryProcess(void *sarg) {
void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
char sourceString[32] = "xxxx";
char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s", g_queryInfo.dbName, g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
sprintf(subTblName, "%s.%s",
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
//printf("inSql: %s\n", inSql);
......@@ -4543,27 +4603,41 @@ void *subQueryProcess(void *sarg) {
replaceSubTblName(g_queryInfo.subQueryInfo.sql[i], sqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID);
sprintf(tmpFile, "%s-%d",
g_queryInfo.subQueryInfo.result[i],
winfo->threadID);
}
selectAndGetResult(winfo->taos, sqlstr, tmpFile);
}
}
et = taosGetTimestampMs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), winfo->start_table_id, winfo->end_table_id, (double)(et - st)/1000.0);
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(),
winfo->start_table_id,
winfo->end_table_id,
(double)(et - st)/1000.0);
}
return NULL;
}
int queryTestProcess() {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, NULL, g_queryInfo.port);
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1);
}
if (0 != g_queryInfo.subQueryInfo.sqlCount) {
(void)getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, g_queryInfo.subQueryInfo.sTblName, &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount);
(void)getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount);
}
printfQueryMeta();
......@@ -4683,9 +4757,14 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF
TAOS_SUB* tsub = NULL;
if (g_queryInfo.superQueryInfo.subscribeMode) {
tsub = taos_subscribe(taos, g_queryInfo.superQueryInfo.subscribeRestart, topic, sql, subscribe_callback, (void*)resultFileName, g_queryInfo.superQueryInfo.subscribeInterval);
tsub = taos_subscribe(taos,
g_queryInfo.superQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName,
g_queryInfo.superQueryInfo.subscribeInterval);
} else {
tsub = taos_subscribe(taos, g_queryInfo.superQueryInfo.subscribeRestart, topic, sql, NULL, NULL, 0);
tsub = taos_subscribe(taos,
g_queryInfo.superQueryInfo.subscribeRestart,
topic, sql, NULL, NULL, 0);
}
if (tsub == NULL) {
......@@ -4837,7 +4916,11 @@ int subscribeTestProcess() {
}
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, g_queryInfo.dbName, g_queryInfo.port);
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1);
......@@ -4935,23 +5018,23 @@ int subscribeTestProcess() {
void initOfInsertMeta() {
memset(&g_Dbs, 0, sizeof(SDbs));
// set default values
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE);
g_Dbs.port = 6030;
tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
g_Dbs.threadCount = 2;
g_Dbs.use_metric = true;
// set default values
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE);
g_Dbs.port = 6030;
tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
g_Dbs.threadCount = 2;
g_Dbs.use_metric = true;
}
void initOfQueryMeta() {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
// set default values
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE);
g_queryInfo.port = 6030;
tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE);
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
// set default values
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE);
g_queryInfo.port = 6030;
tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE);
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
}
void setParaFromArg(){
......@@ -4995,8 +5078,6 @@ void setParaFromArg(){
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].insertInterval = 0;
g_Dbs.db[0].superTbls[0].numRecPerReq = 0;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
......@@ -5145,10 +5226,12 @@ int main(int argc, char *argv[]) {
if (g_args.metaFile) {
initOfInsertMeta();
initOfQueryMeta();
if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile);
return 1;
}
if (INSERT_MODE == g_jsonType) {
if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
(void)insertTestProcess();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册