未验证 提交 3f312f8b 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3317 taosdemo interlace (#5476)

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

check offset 0.

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

fix sample file import bug.

* [TD-3316] <fix>: add test case for limit and offset. fix sample data issue.

* [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file.

* [TD-3317] <feature>: make taosdemo support interlace mode.

json parameter rows_per_tbl support.

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode insertion.

refactor.

* [TD-3317] <feature>: support interlace mode insertion.

change json file.

* [TD-3317] <feature>: support interlace mode insertion.

fix multithread create table regression.

* [TD-3317] <feature>: support interlace mode insertion.

working but not perfect.

* [TD-3317] <feature>: support interlace mode insertion.

rename lowaTest with taosdemoTestWithJson
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 5e94568d
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 5000,
"rows_per_tbl": 50,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 10,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 200,
"multi_thread_write_one_tbl": "no",
"rows_per_tbl": 20,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}]
}]
}]
}
......@@ -60,9 +60,12 @@ extern char configDir[];
#define QUERY_JSON_NAME "query.json"
#define SUBSCRIBE_JSON_NAME "subscribe.json"
#define INSERT_MODE 0
#define QUERY_MODE 1
#define SUBSCRIBE_MODE 2
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
......@@ -109,6 +112,12 @@ enum MODE {
MODE_BUT
};
typedef enum enum_INSERT_MODE {
PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE,
INVALID_INSERT_MODE
} INSERT_MODE;
enum QUERY_TYPE {
NO_INSERT_TYPE,
INSERT_TYPE,
......@@ -148,7 +157,8 @@ enum _show_stables_index {
TSDB_SHOW_STABLES_TID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_MAX_SHOW_STABLES
};
};
enum _describe_table_index {
TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX,
......@@ -188,6 +198,7 @@ typedef struct SArguments_S {
int num_of_CPR;
int num_of_threads;
int insert_interval;
int rows_per_tbl;
int num_of_RPR;
int max_sql_len;
int num_of_tables;
......@@ -197,6 +208,8 @@ typedef struct SArguments_S {
int disorderRange;
int method_of_delete;
char ** arg_list;
int64_t totalInsertRows;
int64_t totalAffectedRows;
} SArguments;
typedef struct SColumn_S {
......@@ -379,8 +392,9 @@ typedef struct SThreadInfo_S {
char db_name[MAX_DB_NAME_SIZE+1];
char fp[4096];
char tb_prefix[MAX_TB_NAME_SIZE];
int start_table_id;
int end_table_id;
int start_table_from;
int end_table_to;
int ntables;
int data_of_rate;
uint64_t start_time;
char* cols;
......@@ -531,6 +545,7 @@ SArguments g_args = {
10, // num_of_CPR
10, // num_of_connections/thread
0, // insert_interval
0, // rows_per_tbl;
100, // num_of_RPR
TSDB_PAYLOAD_SIZE, // max_sql_len
10000, // num_of_tables
......@@ -553,7 +568,8 @@ static FILE * g_fpOfInsertResult = NULL;
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
do { if (g_args.verbose_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
///////////////////////////////////////////////////
......@@ -652,6 +668,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0) {
arguments->insert_interval = atoi(argv[++i]);
} else if (strcmp(argv[i], "-B") == 0) {
arguments->rows_per_tbl = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
arguments->num_of_RPR = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t") == 0) {
......@@ -1299,33 +1317,45 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) {
fprintf(fp, "column[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp(
g_Dbs.db[i].superTbls[j].columns[k].dataType,
"binary", strlen("binary")))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
"nchar", strlen("nchar")))) {
fprintf(fp, "column[%d]:%s(%d) ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else {
fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
}
}
fprintf(fp, "\n");
fprintf(fp, " tagCount: %d\n ", g_Dbs.db[i].superTbls[j].tagCount);
fprintf(fp, " tagCount: %d\n ",
g_Dbs.db[i].superTbls[j].tagCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].tagCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen);
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) {
fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen);
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType,
"binary", strlen("binary")))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType,
"nchar", strlen("nchar")))) {
fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType,
g_Dbs.db[i].superTbls[j].tags[k].dataLen);
} else {
fprintf(fp, "tag[%d]:%s ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType);
}
}
}
fprintf(fp, "\n");
}
fprintf(fp, "\n");
}
SHOW_PARSE_RESULT_END_TO_FILE(fp);
SHOW_PARSE_RESULT_END_TO_FILE(fp);
}
static void printfQueryMeta() {
SHOW_PARSE_RESULT_START();
printf("host: \033[33m%s:%u\033[0m\n", g_queryInfo.host, g_queryInfo.port);
SHOW_PARSE_RESULT_START();
printf("host: \033[33m%s:%u\033[0m\n",
g_queryInfo.host, g_queryInfo.port);
printf("user: \033[33m%s\033[0m\n", g_queryInfo.user);
printf("password: \033[33m%s\033[0m\n", g_queryInfo.password);
printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName);
......@@ -1336,7 +1366,7 @@ static void printfQueryMeta() {
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
if (SUBSCRIBE_MODE == g_args.test_mode) {
if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode);
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart);
......@@ -1353,7 +1383,7 @@ static void printfQueryMeta() {
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.subQueryInfo.sTblName);
if (SUBSCRIBE_MODE == g_args.test_mode) {
if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeMode);
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart);
......@@ -1515,15 +1545,19 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
return -1;
}
tstrncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes);
xFormatTimestamp(dbInfos[count]->create_time, *(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX], TSDB_TIME_PRECISION_MILLI);
tstrncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes);
xFormatTimestamp(dbInfos[count]->create_time,
*(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX],
TSDB_TIME_PRECISION_MILLI);
dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]);
tstrncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX], fields[TSDB_SHOW_DB_KEEP_INDEX].bytes);
tstrncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX],
fields[TSDB_SHOW_DB_KEEP_INDEX].bytes);
dbInfos[count]->cache = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->blocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]);
dbInfos[count]->minrows = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]);
......@@ -1537,8 +1571,9 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX], fields[TSDB_SHOW_DB_STATUS_INDEX].bytes);
tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX],
fields[TSDB_SHOW_DB_STATUS_INDEX].bytes);
count++;
if (count > MAX_DATABASE_COUNT) {
fprintf(stderr, "The database count overflow than %d\n", MAX_DATABASE_COUNT);
......@@ -1593,8 +1628,10 @@ static void printfQuerySystemInfo(TAOS * taos) {
struct tm* lt;
time(&t);
lt = localtime(&t);
snprintf(filename, MAX_QUERY_SQL_LENGTH, "querySystemInfo-%d-%d-%d %d:%d:%d", lt->tm_year+1900, lt->tm_mon, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec);
snprintf(filename, MAX_QUERY_SQL_LENGTH, "querySystemInfo-%d-%d-%d %d:%d:%d",
lt->tm_year+1900, lt->tm_mon, lt->tm_mday, lt->tm_hour, lt->tm_min,
lt->tm_sec);
// show variables
res = taos_query(taos, "show variables;");
//getResult(res, filename);
......@@ -1604,7 +1641,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
res = taos_query(taos, "show dnodes;");
xDumpResultToFile(filename, res);
//getResult(res, filename);
// show databases
res = taos_query(taos, "show databases;");
SDbInfo** dbInfos = (SDbInfo **)calloc(MAX_DATABASE_COUNT, sizeof(SDbInfo *));
......@@ -1621,7 +1658,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
for (int i = 0; i < dbCount; i++) {
// printf database info
printfDbInfoForQueryToFile(filename, dbInfos[i], i);
// show db.vgroups
snprintf(buffer, MAX_QUERY_SQL_LENGTH, "show %s.vgroups;", dbInfos[i]->name);
res = taos_query(taos, buffer);
......@@ -1639,9 +1676,9 @@ static void printfQuerySystemInfo(TAOS * taos) {
}
void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
int postProceSql(char* host, uint16_t port, char* sqlstr)
static int postProceSql(char* host, uint16_t port, char* sqlstr)
{
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
......@@ -2346,7 +2383,8 @@ static int createDatabases() {
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount);
debugPrint("%s() %d supertbl count:%d\n",
__func__, __LINE__, g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
......@@ -2395,8 +2433,12 @@ static void* createTable(void *sarg)
int len = 0;
int batchNum = 0;
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
verbosePrint("%s() LN%d: Creating table from %d to %d\n",
__func__, __LINE__,
winfo->start_table_from, winfo->end_table_to);
for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) {
if (0 == g_Dbs.use_metric) {
snprintf(buffer, buff_len,
"create table if not exists %s.%s%d %s;",
......@@ -2451,7 +2493,7 @@ static void* createTable(void *sarg)
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %d - %d tables\n",
winfo->threadID, winfo->start_table_id, i);
winfo->threadID, winfo->start_table_from, i);
lastPrintTime = currentPrintTime;
}
}
......@@ -2467,7 +2509,7 @@ static void* createTable(void *sarg)
return NULL;
}
int startMultiThreadCreateChildTable(
static int startMultiThreadCreateChildTable(
char* cols, int threads, int startFrom, int ntables,
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
......@@ -2491,7 +2533,6 @@ int startMultiThreadCreateChildTable(
int b = 0;
b = ntables % threads;
int last = startFrom;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
......@@ -2510,9 +2551,11 @@ int startMultiThreadCreateChildTable(
free(infos);
return -1;
}
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->use_metric = 1;
t_info->cols = cols;
t_info->minDelay = INT16_MAX;
......@@ -2599,7 +2642,7 @@ static void createChildTables() {
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
......@@ -2669,7 +2712,6 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
return 0;
}
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
......@@ -2716,7 +2758,8 @@ static int readSampleFromCsvFileToMem(
continue;
}
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, superTblInfo->lenOfOneRow, getRows);
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen,
superTblInfo->lenOfOneRow, getRows);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
......@@ -2958,6 +3001,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* rowsPerTbl = cJSON_GetObjectItem(root, "rows_per_tbl");
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) {
g_args.rows_per_tbl = rowsPerTbl->valueint;
} else if (!rowsPerTbl) {
g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
fprintf(stderr, "ERROR: failed to read json, rows_per_tbl input mistake\n");
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
g_args.max_sql_len = maxSqlLen->valueint;
......@@ -3005,7 +3058,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) {
printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT);
fprintf(stderr,
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_DB_COUNT);
goto PARSE_OVER;
}
......@@ -3202,7 +3257,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) {
printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT);
fprintf(stderr,
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
}
......@@ -3429,6 +3486,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER;
}
cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl");
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint;
......@@ -3768,7 +3826,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
cJSON* subkeepProgress = cJSON_GetObjectItem(subQuery, "keepProgress");
if (subkeepProgress && subkeepProgress->type == cJSON_String && subkeepProgress->valuestring != NULL) {
if (subkeepProgress &&
subkeepProgress->type == cJSON_String
&& subkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", subkeepProgress->valuestring)) {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", subkeepProgress->valuestring)) {
......@@ -3859,27 +3919,27 @@ static bool getInfoFromJsonFile(char* file) {
cJSON* filetype = cJSON_GetObjectItem(root, "filetype");
if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) {
if (0 == strcasecmp("insert", filetype->valuestring)) {
g_args.test_mode = INSERT_MODE;
g_args.test_mode = INSERT_TEST;
} else if (0 == strcasecmp("query", filetype->valuestring)) {
g_args.test_mode = QUERY_MODE;
g_args.test_mode = QUERY_TEST;
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
g_args.test_mode = SUBSCRIBE_MODE;
g_args.test_mode = SUBSCRIBE_TEST;
} else {
printf("ERROR: failed to read json, filetype not support\n");
goto PARSE_OVER;
}
} else if (!filetype) {
g_args.test_mode = INSERT_MODE;
g_args.test_mode = INSERT_TEST;
} else {
printf("ERROR: failed to read json, filetype not found\n");
goto PARSE_OVER;
}
if (INSERT_MODE == g_args.test_mode) {
if (INSERT_TEST == g_args.test_mode) {
ret = getMetaFromInsertJsonFile(root);
} else if (QUERY_MODE == g_args.test_mode) {
} else if (QUERY_TEST == g_args.test_mode) {
ret = getMetaFromQueryJsonFile(root);
} else if (SUBSCRIBE_MODE == g_args.test_mode) {
} else if (SUBSCRIBE_TEST == g_args.test_mode) {
ret = getMetaFromQueryJsonFile(root);
} else {
printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n");
......@@ -4125,13 +4185,34 @@ static int execInsert(threadInfo *winfo, char *buffer, int k)
return affectedRows;
}
static int generateDataBuffer(int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSampleUsePos)
static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if (superTblInfo) {
if ((superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
verbosePrint("%s() LN%d: from=%d count=%d seq=%d\n",
__func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%d",
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
}
}
static int generateDataTail(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo,
SSuperTable* superTblInfo,
int batch, char* buffer, int64_t insertRows,
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
int len = 0;
int ncols_per_record = 1; // count first col ts
if (superTblInfo == NULL) {
......@@ -4142,91 +4223,21 @@ static int generateDataBuffer(int32_t tableSeq,
}
}
assert(buffer != NULL);
char *pChildTblName;
pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (NULL == pChildTblName) {
fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN);
return -1;
}
if (superTblInfo && (superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d",
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
}
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer;
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tableSeq % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
free(pChildTblName);
fprintf(stderr, "tag buf failed to allocate memory\n");
return -1;
}
pstr += snprintf(pstr,
superTblInfo->maxSqlLen,
"insert into %s.%s using %s.%s tags %s values",
pThreadInfo->db_name,
pChildTblName,
pThreadInfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
pstr += snprintf(pstr,
superTblInfo->maxSqlLen,
"insert into %s.%s values",
pThreadInfo->db_name,
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} else {
pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s values",
pThreadInfo->db_name,
pChildTblName);
}
} else {
pstr += snprintf(pstr,
g_args.max_sql_len,
"insert into %s.%s values",
pThreadInfo->db_name,
pChildTblName);
}
int k;
int len = 0;
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) {
int k = 0;
for (k = 0; k < batch;) {
if (superTblInfo) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(
pstr + len,
buffer + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo,
pSampleUsePos);
pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
......@@ -4234,13 +4245,13 @@ static int generateDataBuffer(int32_t tableSeq,
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime - taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData(
pstr + len,
buffer + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
} else {
retLen = generateRowData(
pstr + len,
buffer + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo);
......@@ -4248,7 +4259,6 @@ static int generateDataBuffer(int32_t tableSeq,
}
if (retLen < 0) {
free(pChildTblName);
return -1;
}
......@@ -4277,28 +4287,288 @@ static int generateDataBuffer(int32_t tableSeq,
lenOfBinary);
}
pstr += sprintf(pstr, " %s", data);
if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long
buffer += sprintf(buffer, " %s", data);
if (strlen(buffer) >= (g_args.max_sql_len - 256)) { // too long
k++;
break;
}
}
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer);
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
k++;
startFrom ++;
if (startFrom >= insertRows)
if (startFrom >= insertRows) {
break;
}
}
if (pChildTblName)
free(pChildTblName);
*dataLen = len;
return k;
}
static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer)
{
int len;
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tableSeq % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n");
return -1;
}
len = snprintf(buffer,
superTblInfo->maxSqlLen,
"insert into %s.%s using %s.%s tags %s values",
pThreadInfo->db_name,
tableName,
pThreadInfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len = snprintf(buffer,
superTblInfo->maxSqlLen,
"insert into %s.%s values",
pThreadInfo->db_name,
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} else {
len = snprintf(buffer,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s values",
pThreadInfo->db_name,
tableName);
}
} else {
len = snprintf(buffer,
g_args.max_sql_len,
"insert into %s.%s values",
pThreadInfo->db_name,
tableName);
}
return len;
}
static int generateDataBuffer(char *pTblName,
int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSamplePos)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int ncols_per_record = 1; // count first col ts
if (superTblInfo == NULL) {
int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
}
}
assert(buffer != NULL);
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer;
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, buffer);
pstr += headLen;
int k;
int dataLen;
k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo,
g_args.num_of_RPR, pstr, insertRows, startFrom, startTime,
pSamplePos, &dataLen);
return k;
}
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
printf("### CBD: interlace write\n");
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len,
strerror(errno));
return NULL;
}
int insertMode;
char tableName[TSDB_TABLE_NAME_LEN];
int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.rows_per_tbl;
if (rowsPerTbl > 0) {
insertMode = INTERLACE_INSERT_MODE;
} else {
insertMode = PROGRESSIVE_INSERT_MODE;
}
// rows per table need be less than insert batch
if (rowsPerTbl > g_args.num_of_RPR)
rowsPerTbl = g_args.num_of_RPR;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int tableSeq = pThreadInfo->start_table_from;
debugPrint("%s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time;
int batchPerTblTimes;
int prevBatchPerTbl, lastBatchPerTbl;
if (pThreadInfo->ntables == 1) {
batchPerTblTimes = 1;
lastBatchPerTbl = rowsPerTbl;
prevBatchPerTbl = rowsPerTbl;
} else if (rowsPerTbl > 0) {
batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl;
lastBatchPerTbl = g_args.num_of_RPR % rowsPerTbl;
if (lastBatchPerTbl > 0)
batchPerTblTimes += 1;
else
lastBatchPerTbl = rowsPerTbl;
prevBatchPerTbl = rowsPerTbl;
} else {
batchPerTblTimes = 1;
prevBatchPerTbl = g_args.num_of_RPR;
lastBatchPerTbl = g_args.num_of_RPR;
}
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if (insert_interval) {
st = taosGetTimestampUs();
}
// generate data
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer;
int recGenerated = 0;
for (int i = 0; i < batchPerTblTimes; i ++) {
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
}
}
getTableName(tableName, pThreadInfo, tableSeq);
int headLen;
if (i == 0) {
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr);
} else {
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name,
tableName);
}
// generate data buffer
verbosePrint("%s() LN%d i=%d buffer:\n%s\n",
__func__, __LINE__, i, buffer);
pstr += headLen;
int dataLen = 0;
int batchPerTbl;
if (i == batchPerTblTimes - 1) {
batchPerTbl = lastBatchPerTbl;
} else {
batchPerTbl = prevBatchPerTbl;
}
verbosePrint("%s() LN%d batchPerTbl = %d\n",
__func__, __LINE__, batchPerTbl);
int numOfRecGenerated = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, insertRows, 0,
startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
&(pThreadInfo->samplePos), &dataLen);
verbosePrint("%s() LN%d numOfRecGenerated= %d\n",
__func__, __LINE__, numOfRecGenerated);
pstr += dataLen;
recGenerated += numOfRecGenerated;
tableSeq ++;
}
verbosePrint("%s() LN%d buffer:\n%s\n",
__func__, __LINE__, buffer);
pThreadInfo->totalInsertRows += recGenerated;
int affectedRows = execInsert(pThreadInfo, buffer, recGenerated);
if (affectedRows < 0)
goto free_and_statistics_interlace;
pThreadInfo->totalAffectedRows += affectedRows;
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
if (insert_interval) {
et = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000;
verbosePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
}
free_and_statistics_interlace:
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
return NULL;
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
......@@ -4307,10 +4577,9 @@ static int generateDataBuffer(int32_t tableSeq,
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
static void* syncWrite(void *sarg) {
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
......@@ -4324,19 +4593,18 @@ static void* syncWrite(void *sarg) {
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int insert_interval = superTblInfo?superTblInfo->insertInterval:
g_args.insert_interval;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
winfo->samplePos = 0;
pThreadInfo->samplePos = 0;
for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id;
for (uint32_t tableSeq = pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = winfo->start_time;
int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
......@@ -4346,34 +4614,39 @@ static void* syncWrite(void *sarg) {
st = taosGetTimestampUs();
}
int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows,
i, start_time, &(winfo->samplePos));
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%d tableName=%s\n",
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
int generated = generateDataBuffer(tableName, tableSeq, pThreadInfo, buffer, insertRows,
i, start_time, &(pThreadInfo->samplePos));
if (generated > 0)
i += generated;
else
goto free_and_statistics_2;
int affectedRows = execInsert(winfo, buffer, generated);
int affectedRows = execInsert(pThreadInfo, buffer, generated);
if (affectedRows < 0)
goto free_and_statistics_2;
winfo->totalInsertRows += generated;
winfo->totalAffectedRows += affectedRows;
pThreadInfo->totalInsertRows += generated;
pThreadInfo->totalAffectedRows += affectedRows;
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
winfo->totalInsertRows,
winfo->totalAffectedRows);
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
......@@ -4391,10 +4664,11 @@ static void* syncWrite(void *sarg) {
}
} // num_of_DPT
if ((tableSeq == winfo->end_table_id) && superTblInfo &&
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
printf("%s() LN%d samplePos=%d\n", __func__, __LINE__, winfo->samplePos);
printf("%s() LN%d samplePos=%d\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
} // tableSeq
......@@ -4402,12 +4676,28 @@ free_and_statistics_2:
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
winfo->totalInsertRows,
winfo->totalAffectedRows);
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
return NULL;
}
static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.rows_per_tbl;
if (rowsPerTbl > 0) {
// interlace mode
return syncWriteInterlace(winfo);
} else {
// progressive mode
return syncWriteProgressive(winfo);
}
}
void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param;
SSuperTable* superTblInfo = winfo->superTblInfo;
......@@ -4423,13 +4713,14 @@ void callBack(void *param, TAOS_RES *res, int code) {
char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen);
char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix,
winfo->start_table_from);
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
if (winfo->counter >= g_args.num_of_RPR) {
winfo->start_table_id++;
winfo->start_table_from++;
winfo->counter = 0;
}
if (winfo->start_table_id > winfo->end_table_id) {
if (winfo->start_table_from > winfo->end_table_to) {
tsem_post(&winfo->lock_sem);
free(buffer);
free(data);
......@@ -4553,7 +4844,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
printf("ERROR to parse time!\n");
fprintf(stderr, "ERROR to parse time!\n");
exit(-1);
}
}
......@@ -4563,12 +4854,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double start = getCurrentTime();
int last;
int startFrom;
if ((superTblInfo) && (superTblInfo->childTblOffset >= 0))
last = superTblInfo->childTblOffset;
startFrom = superTblInfo->childTblOffset;
else
last = 0;
startFrom = 0;
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
......@@ -4609,6 +4900,55 @@ static void startMultiThreadInsertData(int threads, char* db_name,
taos_close(taos);
}
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n");
exit(-1);
}
}
TAOS* taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
fprintf(stderr, "connect to server fail , reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
if (superTblInfo) {
int limit, offset;
if (superTblInfo && (superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
limit = superTblInfo->childTblLimit;
offset = superTblInfo->childTblOffset;
} else {
limit = superTblInfo->childTblCount;
offset = 0;
}
superTblInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!");
taos_close(taos);
exit(-1);
}
int childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
limit,
offset);
}
taos_close(taos);
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
......@@ -4625,7 +4965,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n",
fprintf(stderr, "connect to server fail from insert sub thread, reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
......@@ -4635,12 +4975,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
} else {
t_info->start_table_id = 0;
t_info->end_table_id = superTblInfo->childTblCount - 1;
t_info->start_table_from = 0;
t_info->ntables = superTblInfo->childTblCount;
t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint();
}
......@@ -4671,6 +5012,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalInsertRows += t_info->totalInsertRows;
} else {
g_args.totalAffectedRows += t_info->totalAffectedRows;
g_args.totalInsertRows += t_info->totalInsertRows;
}
totalDelay += t_info->totalDelay;
......@@ -4698,6 +5042,18 @@ static void startMultiThreadInsertData(int threads, char* db_name,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
superTblInfo->totalInsertRows/ t);
} else {
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n",
t, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
g_args.totalInsertRows / t);
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n",
t, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
g_args.totalInsertRows / t);
}
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
......@@ -4720,7 +5076,7 @@ void *readTable(void *sarg) {
char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
fprintf(stderr, "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL;
}
......@@ -4732,7 +5088,7 @@ void *readTable(void *sarg) {
num_of_DPT = g_args.num_of_DPT;
// }
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -4794,7 +5150,7 @@ void *readMetric(void *sarg) {
}
int num_of_DPT = rinfo->superTblInfo->insertRows;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -4842,7 +5198,8 @@ void *readMetric(void *sarg) {
}
t = getCurrentTime() - t;
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n", num_of_tables * num_of_DPT / t, t * 1000);
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n",
num_of_tables * num_of_DPT / t, t * 1000);
printf("select %10s took %.6f second(s)\n\n", aggreFunc[j], t);
taos_free_result(pSql);
......@@ -4956,7 +5313,7 @@ void *superQueryProcess(void *sarg) {
while (1) {
if (g_queryInfo.superQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
}
st = taosGetTimestampUs();
......@@ -5020,13 +5377,14 @@ static void *subQueryProcess(void *sarg) {
int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000;
while (1) {
if (g_queryInfo.subQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) {
if (g_queryInfo.subQueryInfo.rate
&& (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
}
st = taosGetTimestampUs();
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i);
......@@ -5042,8 +5400,8 @@ static void *subQueryProcess(void *sarg) {
et = taosGetTimestampUs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(),
winfo->start_table_id,
winfo->end_table_id,
winfo->start_table_from,
winfo->end_table_to,
(double)(et - st)/1000000.0);
}
return NULL;
......@@ -5081,7 +5439,8 @@ static int queryTestProcess() {
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
if (g_queryInfo.superQueryInfo.sqlCount > 0 && g_queryInfo.superQueryInfo.concurrent > 0) {
if (g_queryInfo.superQueryInfo.sqlCount > 0
&& g_queryInfo.superQueryInfo.concurrent > 0) {
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
......@@ -5139,14 +5498,15 @@ static int queryTestProcess() {
b = ntables % threads;
}
int last = 0;
int startFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->taos = taos;
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info);
}
......@@ -5222,7 +5582,7 @@ void *subSubscribeProcess(void *sarg) {
do {
//if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) {
// taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
//}
//st = taosGetTimestampMs();
......@@ -5288,7 +5648,7 @@ void *superSubscribeProcess(void *sarg) {
do {
//if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) {
// taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
//}
//st = taosGetTimestampMs();
......@@ -5417,13 +5777,15 @@ static int subscribeTestProcess() {
b = ntables % threads;
}
int last = 0;
int startFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->taos = taos;
pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info);
}
......@@ -5665,14 +6027,14 @@ void querySqlFile(TAOS* taos, char* sqlFile)
}
static void testMetaFile() {
if (INSERT_MODE == g_args.test_mode) {
if (INSERT_TEST == g_args.test_mode) {
if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
insertTestProcess();
} else if (QUERY_MODE == g_args.test_mode) {
} else if (QUERY_TEST == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
queryTestProcess();
} else if (SUBSCRIBE_MODE == g_args.test_mode) {
} else if (SUBSCRIBE_TEST == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
subscribeTestProcess();
......@@ -5689,16 +6051,18 @@ static void queryResult() {
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_id = 0;
rInfo->start_table_from = 0;
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
strcpy(rInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix);
} else {
rInfo->end_table_id = g_args.num_of_tables -1;
rInfo->ntables = g_args.num_of_tables;
rInfo->end_table_to = g_args.num_of_tables -1;
strcpy(rInfo->tb_prefix, g_args.tb_prefix);
}
......@@ -5729,7 +6093,7 @@ static void queryResult() {
static void testCmdLine() {
g_args.test_mode = INSERT_MODE;
g_args.test_mode = INSERT_TEST;
insertTestProcess();
if (g_Dbs.insert_only)
......
......@@ -231,12 +231,13 @@ python3 test.py -f query/queryInterval.py
python3 test.py -f query/queryFillTest.py
# tools
python3 test.py -f tools/lowaTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest2.py
python3 test.py -f tools/taosdemoTestSampleData.py
# subscribe
python3 test.py -f subscribe/singlemeter.py
......@@ -291,6 +292,5 @@ python3 ./test.py -f insert/boundary2.py
python3 ./test.py -f alter/alter_debugFlag.py
python3 ./test.py -f query/queryBetweenAnd.py
python3 ./test.py -f tag_lite/alter_tag.py
python3 test.py -f tools/taosdemoTestSampleData.py
#======================p4-end===============
\ No newline at end of file
#======================p4-end===============
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 5000,
"rows_per_tbl": 50,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 9,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 250,
"multi_thread_write_one_tbl": "no",
"rows_per_tbl": 80,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT", "count":1}]
}]
}]
}
......@@ -44,7 +44,6 @@
"childtable_offset": 0,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
......
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 1000,
"childtable_limit": 1,
"childtable_offset": 50,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}],
"tags": [{"type": "TINYINT", "count":1}]
}]
}]
}
{
"filetype":"insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 1,
"databases": [{
"dbinfo": {
"name": "db01",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"databases": [{
"dbinfo": {
"name": "db01",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"update": 0,
"maxtablesPerVnode": 1000
},
"super_tables": [{
"name": "stb01",
"childtable_count": 100,
"childtable_prefix": "stb01_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 1000,
"timestamp_step": 1000,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "/home/data/sample.csv",
"tags_file": "",
"columns": [{
"type": "SMALLINT"
}, {
"type": "BOOL"
}, {
"type": "BINARY",
"len": 6
}],
"tags": [{
"type": "INT"
},{
"type": "BINARY",
"len": 4
}]
}]
}]
"maxtablesPerVnode": 1000
},
"super_tables": [{
"name": "stb01",
"childtable_count": 100,
"childtable_prefix": "stb01_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 1000,
"timestamp_step": 1000,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "/home/data/sample.csv",
"tags_file": "",
"columns": [{
"type": "SMALLINT"
}, {
"type": "BOOL"
}, {
"type": "BINARY",
"len": 6
}],
"tags": [{
"type": "INT"
},{
"type": "BINARY",
"len": 4
}]
}]
}]
}
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("%staosdemo -f tools/insert-interlace.json" % binPath)
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 100)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 33000)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -68,6 +68,15 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 20000)
os.system("%staosdemo -f tools/insert-tblimit1-tboffset.json" % binPath)
tdSql.execute("reset query cache")
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 100)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 1000)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册