提交 3b9eb5ea 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: support insert interval. use seperate func for normal table write.

上级 471dc330
...@@ -80,7 +80,6 @@ extern char configDir[]; ...@@ -80,7 +80,6 @@ extern char configDir[];
#define OPT_ABORT 1 /* –abort */ #define OPT_ABORT 1 /* –abort */
#define STRING_LEN 60000 #define STRING_LEN 60000
#define MAX_PREPARED_RAND 1000000 #define MAX_PREPARED_RAND 1000000
//#define MAX_SQL_SIZE 65536
#define MAX_FILE_NAME_LEN 256 #define MAX_FILE_NAME_LEN 256
#define MAX_SAMPLES_ONCE_FROM_FILE 10000 #define MAX_SAMPLES_ONCE_FROM_FILE 10000
...@@ -240,7 +239,7 @@ typedef struct SSuperTable_S { ...@@ -240,7 +239,7 @@ typedef struct SSuperTable_S {
StrColumn tags[MAX_TAG_COUNT]; StrColumn tags[MAX_TAG_COUNT];
char* childTblName; char* childTblName;
char* colsOfCreatChildTable; char* colsOfCreateChildTable;
int lenOfOneRow; int lenOfOneRow;
int lenOfTagOfOneRow; int lenOfTagOfOneRow;
...@@ -458,9 +457,9 @@ void resetAfterAnsiEscape(void) { ...@@ -458,9 +457,9 @@ void resetAfterAnsiEscape(void) {
} }
#endif #endif
int createDatabases(); static int createDatabases();
void createChildTables(); static void createChildTables();
int queryDbExec(TAOS *taos, char *command, int type); static int queryDbExec(TAOS *taos, char *command, int type);
/* ************ Global variables ************ */ /* ************ Global variables ************ */
...@@ -638,6 +637,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -638,6 +637,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
sptr = arguments->datatype; sptr = arguments->datatype;
++i; ++i;
if (strstr(argv[i], ",") == NULL) { if (strstr(argv[i], ",") == NULL) {
// only one col
if (strcasecmp(argv[i], "INT") if (strcasecmp(argv[i], "INT")
&& strcasecmp(argv[i], "FLOAT") && strcasecmp(argv[i], "FLOAT")
&& strcasecmp(argv[i], "TINYINT") && strcasecmp(argv[i], "TINYINT")
...@@ -653,6 +653,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -653,6 +653,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
sptr[0] = argv[i]; sptr[0] = argv[i];
} else { } else {
// more than one col
int index = 0; int index = 0;
char *dupstr = strdup(argv[i]); char *dupstr = strdup(argv[i]);
char *running = dupstr; char *running = dupstr;
...@@ -675,6 +676,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -675,6 +676,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
token = strsep(&running, ","); token = strsep(&running, ",");
if (index >= MAX_NUM_DATATYPE) break; if (index >= MAX_NUM_DATATYPE) break;
} }
sptr[index] = NULL;
} }
} else if (strcmp(argv[i], "-w") == 0) { } else if (strcmp(argv[i], "-w") == 0) {
arguments->len_of_binary = atoi(argv[++i]); arguments->len_of_binary = atoi(argv[++i]);
...@@ -735,6 +737,15 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -735,6 +737,15 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("# User: %s\n", arguments->user); printf("# User: %s\n", arguments->user);
printf("# Password: %s\n", arguments->password); printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) {
printf("# Specified data type: ");
for (int i = 0; i < MAX_NUM_DATATYPE; i++)
if (arguments->datatype[i])
printf("%s,", arguments->datatype[i]);
else
break;
printf("\n");
}
printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Insertion interval: %d\n", arguments->insert_interval);
printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); printf("# Number of Columns per record: %d\n", arguments->num_of_RPR);
printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Threads: %d\n", arguments->num_of_threads);
...@@ -774,7 +785,7 @@ void tmfree(char *buf) { ...@@ -774,7 +785,7 @@ void tmfree(char *buf) {
} }
} }
int queryDbExec(TAOS *taos, char *command, int type) { static int queryDbExec(TAOS *taos, char *command, int type) {
int i; int i;
TAOS_RES *res = NULL; TAOS_RES *res = NULL;
int32_t code = -1; int32_t code = -1;
...@@ -784,7 +795,7 @@ int queryDbExec(TAOS *taos, char *command, int type) { ...@@ -784,7 +795,7 @@ int queryDbExec(TAOS *taos, char *command, int type) {
taos_free_result(res); taos_free_result(res);
res = NULL; res = NULL;
} }
res = taos_query(taos, command); res = taos_query(taos, command);
code = taos_errno(res); code = taos_errno(res);
if (0 == code) { if (0 == code) {
...@@ -793,7 +804,7 @@ int queryDbExec(TAOS *taos, char *command, int type) { ...@@ -793,7 +804,7 @@ int queryDbExec(TAOS *taos, char *command, int type) {
} }
if (code != 0) { if (code != 0) {
debugPrint("DEBUG %s() %d - command: %s\n", __func__, __LINE__, command); debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command);
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res); taos_free_result(res);
//taos_close(taos); //taos_close(taos);
...@@ -1965,13 +1976,14 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, ...@@ -1965,13 +1976,14 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
//printf("%s.%s column count:%d, column length:%d\n\n", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName, g_Dbs.db[i].superTbls[j].columnCount, lenOfOneRow); //printf("%s.%s column count:%d, column length:%d\n\n", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName, g_Dbs.db[i].superTbls[j].columnCount, lenOfOneRow);
// save for creating child table // save for creating child table
superTbls->colsOfCreatChildTable = (char*)calloc(len+20, 1); superTbls->colsOfCreateChildTable = (char*)calloc(len+20, 1);
if (NULL == superTbls->colsOfCreatChildTable) { if (NULL == superTbls->colsOfCreateChildTable) {
printf("Failed when calloc, size:%d", len+1); printf("Failed when calloc, size:%d", len+1);
taos_close(taos); taos_close(taos);
exit(-1); exit(-1);
} }
snprintf(superTbls->colsOfCreatChildTable, len+20, "(ts timestamp%s)", cols); snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols);
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable);
if (use_metric) { if (use_metric) {
char tags[STRING_LEN] = "\0"; char tags[STRING_LEN] = "\0";
...@@ -2020,19 +2032,23 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, ...@@ -2020,19 +2032,23 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
len += snprintf(tags + len, STRING_LEN - len, ")"); len += snprintf(tags + len, STRING_LEN - len, ")");
superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow; superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow;
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); snprintf(command, BUFFER_SIZE,
debugPrint("DEBUG %s() %d \n", __func__, __LINE__); "create table if not exists %s.%s (ts timestamp%s) tags %s",
dbName, superTbls->sTblName, cols, tags);
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
return -1; fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName);
return -1;
} }
printf("\ncreate supertable %s success!\n\n", superTbls->sTblName); debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName);
} }
return 0; return 0;
} }
int createDatabases() { static int createDatabases() {
TAOS * taos = NULL; TAOS * taos = NULL;
int ret = 0; int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port); taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
...@@ -2112,7 +2128,7 @@ int createDatabases() { ...@@ -2112,7 +2128,7 @@ int createDatabases() {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
"precision \'%s\';", g_Dbs.db[i].dbCfg.precision); "precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
} }
debugPrint("DEBUG %s() %d \n", __func__, __LINE__); debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos); taos_close(taos);
...@@ -2138,8 +2154,6 @@ int createDatabases() { ...@@ -2138,8 +2154,6 @@ int createDatabases() {
printf("\ncreate super table %d failed!\n\n", j); printf("\ncreate super table %d failed!\n\n", j);
taos_close(taos); taos_close(taos);
return -1; return -1;
} else {
printf("\ncreate super table %d success!\n\n", j);
} }
} }
} }
...@@ -2148,7 +2162,7 @@ int createDatabases() { ...@@ -2148,7 +2162,7 @@ int createDatabases() {
return 0; return 0;
} }
void * createTable(void *sarg) static void* createTable(void *sarg)
{ {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
...@@ -2161,7 +2175,7 @@ void * createTable(void *sarg) ...@@ -2161,7 +2175,7 @@ void * createTable(void *sarg)
else else
buff_len = BUFFER_SIZE; buff_len = BUFFER_SIZE;
char *buffer = calloc(superTblInfo->maxSqlLen, 1); char *buffer = calloc(buff_len, 1);
if (buffer == NULL) { if (buffer == NULL) {
fprintf(stderr, "Memory allocated failed!"); fprintf(stderr, "Memory allocated failed!");
exit(-1); exit(-1);
...@@ -2175,8 +2189,8 @@ void * createTable(void *sarg) ...@@ -2175,8 +2189,8 @@ void * createTable(void *sarg)
snprintf(buffer, buff_len, snprintf(buffer, buff_len,
"create table if not exists %s.%s%d %s;", "create table if not exists %s.%s%d %s;",
winfo->db_name, winfo->db_name,
superTblInfo->childTblPrefix, i, g_args.tb_prefix, i,
superTblInfo->colsOfCreatChildTable); winfo->cols);
} else { } else {
if (0 == len) { if (0 == len) {
batchNum = 0; batchNum = 0;
...@@ -2215,7 +2229,7 @@ void * createTable(void *sarg) ...@@ -2215,7 +2229,7 @@ void * createTable(void *sarg)
} }
len = 0; len = 0;
debugPrint("DEBUG %s() %d \n", __func__, __LINE__); debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer); free(buffer);
return NULL; return NULL;
...@@ -2303,25 +2317,50 @@ int startMultiThreadCreateChildTable( ...@@ -2303,25 +2317,50 @@ int startMultiThreadCreateChildTable(
} }
void createChildTables() { static void createChildTables() {
char tblColsBuf[MAX_SQL_SIZE];
int len;
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
// with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue; continue;
} }
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl, g_Dbs.threadCountByCreateTbl,
g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
} }
} else { } else {
// normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
for (int i = 0; i < MAX_COLUMN_COUNT; i++) {
if (g_args.datatype[i]) {
if ((strncasecmp(g_args.datatype[i], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[i], "NCHAR", strlen("NCHAR")) == 0)) {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", i, g_args.datatype[i]);
} else {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", i, g_args.datatype[i]);
}
len = strlen(tblColsBuf);
} else {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ")");
break;
}
}
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__,
tblColsBuf);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, tblColsBuf,
g_Dbs.threadCountByCreateTbl, g_Dbs.threadCountByCreateTbl,
g_args.num_of_DPT, g_args.num_of_DPT,
g_Dbs.db[i].dbName, g_Dbs.db[i].dbName,
...@@ -2361,7 +2400,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -2361,7 +2400,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
size_t n = 0; size_t n = 0;
ssize_t readLen = 0; ssize_t readLen = 0;
char * line = NULL; char * line = NULL;
FILE *fp = fopen(superTblInfo->tagsFile, "r"); FILE *fp = fopen(superTblInfo->tagsFile, "r");
if (fp == NULL) { if (fp == NULL) {
printf("Failed to open tags file: %s, reason:%s\n", printf("Failed to open tags file: %s, reason:%s\n",
...@@ -2373,7 +2412,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -2373,7 +2412,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
free(superTblInfo->tagDataBuf); free(superTblInfo->tagDataBuf);
superTblInfo->tagDataBuf = NULL; superTblInfo->tagDataBuf = NULL;
} }
int tagCount = 10000; int tagCount = 10000;
int count = 0; int count = 0;
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount); char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount);
...@@ -3612,7 +3651,7 @@ void prePareSampleData() { ...@@ -3612,7 +3651,7 @@ void prePareSampleData() {
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
// readSampleFromFileToMem(&g_Dbs.db[i].superTbls[j]); // readSampleFromFileToMem(&g_Dbs.db[i].superTbls[j]);
//} //}
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
(void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]); (void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]);
} }
...@@ -3624,9 +3663,9 @@ void postFreeResource() { ...@@ -3624,9 +3663,9 @@ void postFreeResource() {
tmfclose(g_fpOfInsertResult); tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreatChildTable) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable); free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreatChildTable = NULL; g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
} }
if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) { if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf); free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
...@@ -3704,11 +3743,11 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* ...@@ -3704,11 +3743,11 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
} }
dataLen -= 2; dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
return dataLen; return dataLen;
} }
void syncWriteForNumberOfTblInOneSql( static void syncWriteForNumberOfTblInOneSql(
threadInfo *winfo, FILE *fp, char* sampleDataBuf) { threadInfo *winfo, FILE *fp, char* sampleDataBuf) {
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
...@@ -3731,7 +3770,7 @@ void syncWriteForNumberOfTblInOneSql( ...@@ -3731,7 +3770,7 @@ void syncWriteForNumberOfTblInOneSql(
numberOfTblInOneSql = tbls; numberOfTblInOneSql = tbls;
} }
int64_t time_counter = winfo->start_time; uint64_t time_counter = winfo->start_time;
int64_t tmp_time; int64_t tmp_time;
int sampleUsePos; int sampleUsePos;
...@@ -3955,6 +3994,64 @@ send_to_server: ...@@ -3955,6 +3994,64 @@ send_to_server:
return; return;
} }
int32_t generateData(char *res, char **data_type,
int num_of_cols, int64_t timestamp, int len_of_binary) {
memset(res, 0, MAX_DATA_SIZE);
char *pstr = res;
pstr += sprintf(pstr, "(%" PRId64, timestamp);
int c = 0;
for (; c < MAX_NUM_DATATYPE; c++) {
if (data_type[c] == NULL) {
break;
}
}
if (0 == c) {
perror("data type error!");
exit(-1);
}
for (int i = 0; i < num_of_cols; i++) {
if (strcasecmp(data_type[i % c], "tinyint") == 0) {
pstr += sprintf(pstr, ", %d", rand_tinyint() );
} else if (strcasecmp(data_type[i % c], "smallint") == 0) {
pstr += sprintf(pstr, ", %d", rand_smallint());
} else if (strcasecmp(data_type[i % c], "int") == 0) {
pstr += sprintf(pstr, ", %d", rand_int());
} else if (strcasecmp(data_type[i % c], "bigint") == 0) {
pstr += sprintf(pstr, ", %" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % c], "float") == 0) {
pstr += sprintf(pstr, ", %10.4f", rand_float());
} else if (strcasecmp(data_type[i % c], "double") == 0) {
double t = rand_double();
pstr += sprintf(pstr, ", %20.8f", t);
} else if (strcasecmp(data_type[i % c], "bool") == 0) {
bool b = rand() & 1;
pstr += sprintf(pstr, ", %s", b ? "true" : "false");
} else if (strcasecmp(data_type[i % c], "binary") == 0) {
char *s = malloc(len_of_binary);
rand_string(s, len_of_binary);
pstr += sprintf(pstr, ", \"%s\"", s);
free(s);
}else if (strcasecmp(data_type[i % c], "nchar") == 0) {
char *s = malloc(len_of_binary);
rand_string(s, len_of_binary);
pstr += sprintf(pstr, ", \"%s\"", s);
free(s);
}
if (pstr - res > MAX_DATA_SIZE) {
perror("column length too long, abort");
exit(-1);
}
}
pstr += sprintf(pstr, ")");
return (int32_t)(pstr - res);
}
// sync insertion // sync insertion
/* /*
1 thread: 100 tables * 2000 rows/s 1 thread: 100 tables * 2000 rows/s
...@@ -3963,7 +4060,88 @@ send_to_server: ...@@ -3963,7 +4060,88 @@ send_to_server:
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/ */
void *syncWrite(void *sarg) { static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
char buffer[BUFFER_SIZE] = "\0";
char data[MAX_DATA_SIZE];
char **data_type = g_args.datatype;
int len_of_binary = g_args.len_of_binary;
int ncols_per_record = 1; // count first col ts
for (int i = 0; i < MAX_COLUMN_COUNT; i ++) {
if (NULL == g_args.datatype[i])
break;
else
ncols_per_record ++;
}
srand((uint32_t)time(NULL));
int64_t time_counter = winfo->start_time;
for (int i = 0; i < g_args.num_of_DPT;) {
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int inserted = i;
int64_t tmp_time = time_counter;
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, g_args.tb_prefix, tID);
int k;
for (k = 0; k < g_args.num_of_RPR;) {
int rand_num = rand() % 100;
int len = -1;
if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) {
int64_t d = tmp_time - rand() % 1000000 + rand_num;
len = generateData(data, data_type, ncols_per_record, d, len_of_binary);
} else {
len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
}
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= BUFFER_SIZE) { // too long
break;
}
pstr += sprintf(pstr, " %s", data);
inserted++;
k++;
if (inserted >= g_args.num_of_DPT)
break;
}
/* puts(buffer); */
int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
//queryDB(winfo->taos, buffer);
debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
if (0 <= affectedRows){
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
}
if (tID == winfo->end_table_id) {
i = inserted;
time_counter = tmp_time;
}
}
}
return NULL;
}
static void* syncWriteWithStb(void *sarg) {
uint64_t totalRowsInserted = 0; uint64_t totalRowsInserted = 0;
uint64_t totalAffectedRows = 0; uint64_t totalAffectedRows = 0;
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
...@@ -4137,7 +4315,7 @@ void *syncWrite(void *sarg) { ...@@ -4137,7 +4315,7 @@ void *syncWrite(void *sarg) {
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
debugPrint("DEBUG %s() %d \n", __func__, __LINE__); debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows){ if (0 > affectedRows){
goto free_and_statistics_2; goto free_and_statistics_2;
...@@ -4280,10 +4458,11 @@ void *asyncWrite(void *sarg) { ...@@ -4280,10 +4458,11 @@ void *asyncWrite(void *sarg) {
void startMultiThreadInsertData(int threads, char* db_name, char* precision, void startMultiThreadInsertData(int threads, char* db_name, char* precision,
SSuperTable* superTblInfo) { SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
memset(pids, 0, threads * sizeof(pthread_t)); threadInfo *infos = malloc(threads * sizeof(threadInfo));
memset(infos, 0, threads * sizeof(threadInfo)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
int ntables = 0; int ntables = 0;
if (superTblInfo) if (superTblInfo)
...@@ -4323,15 +4502,20 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4323,15 +4502,20 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
} }
} }
int64_t start_time;
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { int64_t start_time;
start_time = taosGetTimestamp(timePrec); if (superTblInfo) {
} else { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
(void)taosParseTime( start_time = taosGetTimestamp(timePrec);
} else {
taosParseTime(
superTblInfo->startTimestamp, superTblInfo->startTimestamp,
&start_time, &start_time,
strlen(superTblInfo->startTimestamp), strlen(superTblInfo->startTimestamp),
timePrec, 0); timePrec, 0);
}
} else {
start_time = 1500000000000;
} }
double start = getCurrentTime(); double start = getCurrentTime();
...@@ -4346,7 +4530,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4346,7 +4530,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
t_info->start_time = start_time; t_info->start_time = start_time;
t_info->minDelay = INT16_MAX; t_info->minDelay = INT16_MAX;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { if ((NULL == superTblInfo) || (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
//t_info->taos = taos; //t_info->taos = taos;
t_info->taos = taos_connect( t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
...@@ -4359,7 +4543,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4359,7 +4543,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
t_info->taos = NULL; t_info->taos = NULL;
} }
if (0 == superTblInfo->multiThreadWriteOneTbl) { if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) {
t_info->start_table_id = last; t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1; t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1; last = t_info->end_table_id + 1;
...@@ -4371,7 +4555,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4371,7 +4555,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
tsem_init(&(t_info->lock_sem), 0, 0); tsem_init(&(t_info->lock_sem), 0, 0);
if (SYNC == g_Dbs.queryMode) { if (SYNC == g_Dbs.queryMode) {
pthread_create(pids + i, NULL, syncWrite, t_info); if (superTblInfo) {
pthread_create(pids + i, NULL, syncWriteWithStb, t_info);
} else {
pthread_create(pids + i, NULL, syncWrite, t_info);
}
} else { } else {
pthread_create(pids + i, NULL, asyncWrite, t_info); pthread_create(pids + i, NULL, asyncWrite, t_info);
} }
...@@ -4393,13 +4581,15 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4393,13 +4581,15 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos); taos_close(t_info->taos);
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; if (superTblInfo) {
superTblInfo->totalRowsInserted += t_info->totalRowsInserted; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
totalDelay += t_info->totalDelay; totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay; cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
}
} }
cntDelay -= 1; cntDelay -= 1;
...@@ -4408,16 +4598,19 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4408,16 +4598,19 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
double end = getCurrentTime(); double end = getCurrentTime();
double t = end - start; double t = end - start;
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
if (superTblInfo) {
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted, t, superTblInfo->totalRowsInserted,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
superTblInfo->totalRowsInserted / t); superTblInfo->totalRowsInserted / t);
fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted, t, superTblInfo->totalRowsInserted,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
superTblInfo->totalRowsInserted / t); superTblInfo->totalRowsInserted / t);
}
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0); avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
...@@ -4436,7 +4629,7 @@ void *readTable(void *sarg) { ...@@ -4436,7 +4629,7 @@ void *readTable(void *sarg) {
threadInfo *rinfo = (threadInfo *)sarg; threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos; TAOS *taos = rinfo->taos;
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
int64_t sTime = rinfo->start_time; uint64_t sTime = rinfo->start_time;
char *tb_prefix = rinfo->tb_prefix; char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a"); FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) { if (NULL == fp) {
...@@ -4444,7 +4637,13 @@ void *readTable(void *sarg) { ...@@ -4444,7 +4637,13 @@ void *readTable(void *sarg) {
return NULL; return NULL;
} }
int num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; int num_of_DPT;
if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
num_of_DPT = g_args.num_of_DPT;
}
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables; int totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc; bool do_aggreFunc = g_Dbs.do_aggreFunc;
...@@ -4594,7 +4793,7 @@ int insertTestProcess() { ...@@ -4594,7 +4793,7 @@ int insertTestProcess() {
init_rand_data(); init_rand_data();
// create database and super tables // create database and super tables
if( createDatabases() != 0) { if(createDatabases() != 0) {
fclose(g_fpOfInsertResult); fclose(g_fpOfInsertResult);
return -1; return -1;
} }
...@@ -4762,7 +4961,7 @@ void *subQueryProcess(void *sarg) { ...@@ -4762,7 +4961,7 @@ void *subQueryProcess(void *sarg) {
return NULL; return NULL;
} }
int queryTestProcess() { static int queryTestProcess() {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
g_queryInfo.user, g_queryInfo.user,
...@@ -5052,7 +5251,7 @@ void *superSubscribeProcess(void *sarg) { ...@@ -5052,7 +5251,7 @@ void *superSubscribeProcess(void *sarg) {
return NULL; return NULL;
} }
int subscribeTestProcess() { static int subscribeTestProcess() {
printfQueryMeta(); printfQueryMeta();
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
...@@ -5201,6 +5400,9 @@ void setParaFromArg(){ ...@@ -5201,6 +5400,9 @@ void setParaFromArg(){
g_Dbs.port = g_args.port; g_Dbs.port = g_args.port;
} }
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.dbCount = 1; g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = 1; g_Dbs.db[0].drop = 1;
...@@ -5352,7 +5554,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) ...@@ -5352,7 +5554,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
} }
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__); debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, cmd);
queryDbExec(taos, cmd, NO_INSERT_TYPE); queryDbExec(taos, cmd, NO_INSERT_TYPE);
memset(cmd, 0, MAX_SQL_SIZE); memset(cmd, 0, MAX_SQL_SIZE);
cmd_len = 0; cmd_len = 0;
...@@ -5367,26 +5569,24 @@ void querySqlFile(TAOS* taos, char* sqlFile) ...@@ -5367,26 +5569,24 @@ void querySqlFile(TAOS* taos, char* sqlFile)
return; return;
} }
static void testMetaFile() {
void testMetaFile() {
if (INSERT_MODE == g_args.test_mode) { if (INSERT_MODE == g_args.test_mode) {
if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir); if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
insertTestProcess(); insertTestProcess();
} else if (QUERY_MODE == g_args.test_mode) { } else if (QUERY_MODE == g_args.test_mode) {
if (g_queryInfo.cfgDir[0]) if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
(void)queryTestProcess(); queryTestProcess();
} else if (SUBSCRIBE_MODE == g_args.test_mode) { } else if (SUBSCRIBE_MODE == g_args.test_mode) {
if (g_queryInfo.cfgDir[0]) if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
(void)subscribeTestProcess(); subscribeTestProcess();
} else { } else {
; ;
} }
} }
void testCmdLine() { static void testCmdLine() {
g_args.test_mode = INSERT_MODE; g_args.test_mode = INSERT_MODE;
insertTestProcess(); insertTestProcess();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册