提交 a1891a77 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch 'develop' into hotfix/sangshuduo/TD-3143-taosdemo-windows

...@@ -225,7 +225,13 @@ SHOW MNODES; ...@@ -225,7 +225,13 @@ SHOW MNODES;
## <a class="anchor" id="arbitrator"></a>Arbitrator的使用 ## <a class="anchor" id="arbitrator"></a>Arbitrator的使用
如果副本数为偶数,当一个vnode group里一半vnode不工作时,是无法从中选出master的。同理,一半mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了Arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含Arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到Arbitrator,那么节点B就能正常工作。 如果副本数为偶数,当一个 vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B 就能正常工作。
TDengine提供一个执行程序,名为 tarbitrator,找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为Arbitrator的End Point。如果该参数配置了,当副本数为偶数时,系统将自动连接配置的Arbitrator。如果副本数为奇数,即使配置了Arbitrator,系统也不会去建立连接。 总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。
Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没有要求,只需要保证有网络连接,找任何一台 Linux 服务器运行它即可。以下简要描述安装配置的步骤:
1. 请点击 [安装包下载](https://www.taosdata.com/cn/all-downloads/),在 TDengine Arbitrator Linux 一节中,选择合适的版本下载并安装。
2. 该应用的命令行参数 `-p` 可以指定其对外服务的端口号,缺省是 6042。
3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。)
4. 在配置文件中配置了的 Arbitrator,会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。
...@@ -198,6 +198,14 @@ void dnodeCleanupVnodes() { ...@@ -198,6 +198,14 @@ void dnodeCleanupVnodes() {
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code)); dError("status rsp is received, error:%s", tstrerror(pMsg->code));
if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) {
char clusterId[TSDB_CLUSTER_ID_LEN];
dnodeGetClusterId(clusterId);
if (clusterId[0] != '\0') {
dError("exit zombie dropped dnode");
exit(EXIT_FAILURE);
}
}
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return; return;
} }
......
...@@ -187,6 +187,7 @@ typedef struct SArguments_S { ...@@ -187,6 +187,7 @@ typedef struct SArguments_S {
bool insert_only; bool insert_only;
bool answer_yes; bool answer_yes;
bool debug_print; bool debug_print;
bool verbose_print;
char * output_file; char * output_file;
int mode; int mode;
char * datatype[MAX_NUM_DATATYPE + 1]; char * datatype[MAX_NUM_DATATYPE + 1];
...@@ -491,6 +492,7 @@ SArguments g_args = { ...@@ -491,6 +492,7 @@ SArguments g_args = {
false, // use_metric false, // use_metric
false, // insert_only false, // insert_only
false, // debug_print false, // debug_print
false, // verbose_print
false, // answer_yes; false, // answer_yes;
"./output.txt", // output_file "./output.txt", // output_file
0, // mode : sync or async 0, // mode : sync or async
...@@ -528,7 +530,11 @@ static SQueryMetaInfo g_queryInfo; ...@@ -528,7 +530,11 @@ static SQueryMetaInfo g_queryInfo;
static FILE * g_fpOfInsertResult = NULL; static FILE * g_fpOfInsertResult = NULL;
#define debugPrint(fmt, ...) \ #define debugPrint(fmt, ...) \
do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
void printHelp() { void printHelp() {
...@@ -693,6 +699,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -693,6 +699,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->answer_yes = true; arguments->answer_yes = true;
} else if (strcmp(argv[i], "-g") == 0) { } else if (strcmp(argv[i], "-g") == 0) {
arguments->debug_print = true; arguments->debug_print = true;
} else if (strcmp(argv[i], "-gg") == 0) {
arguments->verbose_print = true;
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) { } else if (strcmp(argv[i], "-O") == 0) {
...@@ -750,7 +758,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -750,7 +758,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("\n"); printf("\n");
} }
printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Insertion interval: %d\n", arguments->insert_interval);
printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); printf("# Number of records per req: %d\n", arguments->num_of_RPR);
printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Threads: %d\n", arguments->num_of_threads);
printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Tables: %d\n", arguments->num_of_tables);
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
...@@ -807,7 +815,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { ...@@ -807,7 +815,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
} }
if (code != 0) { if (code != 0) {
debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res); taos_free_result(res);
//taos_close(taos); //taos_close(taos);
...@@ -2057,7 +2065,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, ...@@ -2057,7 +2065,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
exit(-1); exit(-1);
} }
snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols);
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable);
if (use_metric) { if (use_metric) {
char tags[STRING_LEN] = "\0"; char tags[STRING_LEN] = "\0";
...@@ -2110,13 +2118,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, ...@@ -2110,13 +2118,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
snprintf(command, BUFFER_SIZE, snprintf(command, BUFFER_SIZE,
"create table if not exists %s.%s (ts timestamp%s) tags %s", "create table if not exists %s.%s (ts timestamp%s) tags %s",
dbName, superTbls->sTblName, cols, tags); dbName, superTbls->sTblName, cols, tags);
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName);
return -1; return -1;
} }
debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); debugPrint("create supertable %s success!\n\n", superTbls->sTblName);
} }
return 0; return 0;
} }
...@@ -2135,7 +2143,7 @@ static int createDatabases() { ...@@ -2135,7 +2143,7 @@ static int createDatabases() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) { if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos); taos_close(taos);
return -1; return -1;
...@@ -2203,7 +2211,7 @@ static int createDatabases() { ...@@ -2203,7 +2211,7 @@ static int createDatabases() {
"precision \'%s\';", g_Dbs.db[i].dbCfg.precision); "precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
} }
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); debugPrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos); taos_close(taos);
printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
...@@ -2211,11 +2219,11 @@ static int createDatabases() { ...@@ -2211,11 +2219,11 @@ static int createDatabases() {
} }
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
debugPrint("DEBUG %s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists // describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
...@@ -2303,7 +2311,7 @@ static void* createTable(void *sarg) ...@@ -2303,7 +2311,7 @@ static void* createTable(void *sarg)
} }
len = 0; len = 0;
debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer); free(buffer);
return NULL; return NULL;
...@@ -2318,7 +2326,7 @@ static void* createTable(void *sarg) ...@@ -2318,7 +2326,7 @@ static void* createTable(void *sarg)
} }
if (0 != len) { if (0 != len) {
debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
} }
...@@ -2356,7 +2364,7 @@ int startMultiThreadCreateChildTable( ...@@ -2356,7 +2364,7 @@ int startMultiThreadCreateChildTable(
t_info->threadID = i; t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
t_info->superTblInfo = superTblInfo; t_info->superTblInfo = superTblInfo;
debugPrint("DEBUG %s() %d db_name: %s\n", __func__, __LINE__, db_name); verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
t_info->taos = taos_connect( t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.host,
g_Dbs.user, g_Dbs.user,
...@@ -2407,7 +2415,7 @@ static void createChildTables() { ...@@ -2407,7 +2415,7 @@ static void createChildTables() {
continue; continue;
} }
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
...@@ -2433,7 +2441,7 @@ static void createChildTables() { ...@@ -2433,7 +2441,7 @@ static void createChildTables() {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
debugPrint("DEBUG - %s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
tblColsBuf, tblColsBuf,
...@@ -3295,7 +3303,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3295,7 +3303,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) { if (insertRows && insertRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
...@@ -3658,7 +3665,7 @@ PARSE_OVER: ...@@ -3658,7 +3665,7 @@ PARSE_OVER:
} }
static bool getInfoFromJsonFile(char* file) { static bool getInfoFromJsonFile(char* file) {
debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file); debugPrint("%s %d %s\n", __func__, __LINE__, file);
FILE *fp = fopen(file, "r"); FILE *fp = fopen(file, "r");
if (!fp) { if (!fp) {
...@@ -3817,6 +3824,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* ...@@ -3817,6 +3824,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
return (-1); return (-1);
} }
} }
dataLen -= 2; dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
...@@ -3847,7 +3855,6 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3847,7 +3855,6 @@ static void syncWriteForNumberOfTblInOneSql(
} }
uint64_t time_counter = winfo->start_time; uint64_t time_counter = winfo->start_time;
int64_t tmp_time;
int sampleUsePos; int sampleUsePos;
int64_t st = 0; int64_t st = 0;
...@@ -3855,6 +3862,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3855,6 +3862,7 @@ static void syncWriteForNumberOfTblInOneSql(
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
int32_t tbl_id = 0; int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int64_t tmp_time = 0;
int inserted = i; int inserted = i;
for (int k = 0; k < g_args.num_of_RPR;) { for (int k = 0; k < g_args.num_of_RPR;) {
...@@ -3991,7 +3999,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3991,7 +3999,7 @@ static void syncWriteForNumberOfTblInOneSql(
send_to_server: send_to_server:
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { if (g_args.insert_interval && (g_args.insert_interval > (et - st))) {
int sleep_time = g_args.insert_interval - (et -st); int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time); printf("sleep: %d ms specified by insert_interval\n", sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
...@@ -4008,7 +4016,7 @@ send_to_server: ...@@ -4008,7 +4016,7 @@ send_to_server:
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec( int affectedRows = queryDbExec(
winfo->taos, buffer, INSERT_TYPE); winfo->taos, buffer, INSERT_TYPE);
...@@ -4021,16 +4029,16 @@ send_to_server: ...@@ -4021,16 +4029,16 @@ send_to_server:
if (delay < winfo->minDelay) winfo->minDelay = delay; if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo->totalAffectedRows += affectedRows;
} }
totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID, winfo->threadID,
totalRowsInserted, winfo->totalRowsInserted,
totalAffectedRows); winfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
//int64_t t2 = taosGetTimestampMs(); //int64_t t2 = taosGetTimestampMs();
...@@ -4161,15 +4169,18 @@ static void* syncWrite(void *sarg) { ...@@ -4161,15 +4169,18 @@ static void* syncWrite(void *sarg) {
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0;
for (int i = 0; i < g_args.num_of_DPT;) { winfo->totalRowsInserted = 0;
winfo->totalAffectedRows = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int inserted = i; for (int i = 0; i < g_args.num_of_DPT;) {
int tblInserted = i;
int64_t tmp_time = time_counter; int64_t tmp_time = time_counter;
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, pstr += sprintf(pstr,
"insert into %s.%s%d values", "insert into %s.%s%d values ",
winfo->db_name, g_args.tb_prefix, tID); winfo->db_name, g_args.tb_prefix, tID);
int k; int k;
for (k = 0; k < g_args.num_of_RPR;) { for (k = 0; k < g_args.num_of_RPR;) {
...@@ -4193,13 +4204,15 @@ static void* syncWrite(void *sarg) { ...@@ -4193,13 +4204,15 @@ static void* syncWrite(void *sarg) {
} }
pstr += sprintf(pstr, " %s", data); pstr += sprintf(pstr, " %s", data);
inserted++; tblInserted++;
k++; k++;
i++;
if (inserted >= g_args.num_of_DPT) if (tblInserted >= g_args.num_of_DPT)
break; break;
} }
winfo->totalRowsInserted += k;
/* puts(buffer); */ /* puts(buffer); */
int64_t startTs; int64_t startTs;
int64_t endTs; int64_t endTs;
...@@ -4208,16 +4221,17 @@ static void* syncWrite(void *sarg) { ...@@ -4208,16 +4221,17 @@ static void* syncWrite(void *sarg) {
if (i > 0 && g_args.insert_interval if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) { && (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st); int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time); printf("sleep: %d ms specified by insert_interval\n", sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
if (g_args.insert_interval) { if (g_args.insert_interval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1); int affectedRows = queryDbExec(winfo->taos, buffer, 1);
verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows);
if (0 <= affectedRows){ if (0 <= affectedRows){
endTs = taosGetTimestampUs(); endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs; int64_t delay = endTs - startTs;
...@@ -4227,27 +4241,31 @@ static void* syncWrite(void *sarg) { ...@@ -4227,27 +4241,31 @@ static void* syncWrite(void *sarg) {
winfo->minDelay = delay; winfo->minDelay = delay;
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; winfo->totalAffectedRows += affectedRows;
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
} }
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows);
if (g_args.insert_interval) { if (g_args.insert_interval) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
} }
if (tID == winfo->end_table_id) { if (tblInserted >= g_args.num_of_DPT) {
i = inserted; break;
time_counter = tmp_time;
}
} }
} // num_of_DPT
} // tId
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
winfo->totalRowsInserted,
winfo->totalAffectedRows);
}
return NULL; return NULL;
} }
static void* syncWriteWithStb(void *sarg) { static void* syncWriteWithStb(void *sarg) {
uint64_t totalRowsInserted = 0;
uint64_t totalAffectedRows = 0;
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
...@@ -4303,24 +4321,41 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4303,24 +4321,41 @@ static void* syncWriteWithStb(void *sarg) {
return NULL; return NULL;
} }
int64_t time_counter = winfo->start_time;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0;
debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); winfo->totalRowsInserted = 0;
winfo->totalAffectedRows = 0;
int sampleUsePos;
debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) {
int64_t start_time = winfo->start_time;
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { int64_t tblInserted = i;
uint64_t inserted = i;
uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
printf("sleep: %d ms specified by insert_interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
sampleUsePos = samplePos;
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
int sampleUsePos = samplePos;
int k = 0;
debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) {
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen); memset(buffer, 0, superTblInfo->maxSqlLen);
int len = 0;
char *pstr = buffer; char *pstr = buffer;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
...@@ -4361,12 +4396,14 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4361,12 +4396,14 @@ static void* syncWriteWithStb(void *sarg) {
tID); tID);
} }
int k;
for (k = 0; k < g_args.num_of_RPR;) {
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample( retLen = getRowDataFromSample(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep, start_time + superTblInfo->timeStampStep * i,
superTblInfo, superTblInfo,
&sampleUsePos, &sampleUsePos,
fp, fp,
...@@ -4378,54 +4415,44 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4378,54 +4415,44 @@ static void* syncWriteWithStb(void *sarg) {
int rand_num = rand_tinyint() % 100; int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
int64_t d = tmp_time - rand() % superTblInfo->disorderRange; int64_t d = start_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData( retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, d, superTblInfo->maxSqlLen - len,
d,
superTblInfo); superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
} else { } else {
retLen = generateRowData( retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep, start_time + superTblInfo->timeStampStep * i,
superTblInfo); superTblInfo);
} }
if (retLen < 0) { if (retLen < 0) {
goto free_and_statistics_2; goto free_and_statistics_2;
} }
} }
/* len += retLen;
*/ len += retLen;
inserted++; verbosePrint("%s() LN%d retLen=%d len=%d k=%d buffer=%s\n", __func__, __LINE__, retLen, len, k, buffer);
tblInserted++;
k++; k++;
totalRowsInserted++; i++;
if (inserted > superTblInfo->insertRows) if (tblInserted >= superTblInfo->insertRows)
break;
/* if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break; break;
*/
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms
} }
if (g_args.insert_interval) { winfo->totalRowsInserted += k;
st = taosGetTimestampMs();
}
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
//printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t startTs; int64_t startTs;
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows){ if (0 > affectedRows){
...@@ -4437,25 +4464,19 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4437,25 +4464,19 @@ static void* syncWriteWithStb(void *sarg) {
if (delay < winfo->minDelay) winfo->minDelay = delay; if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
} }
totalAffectedRows += affectedRows; winfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID, winfo->threadID,
totalRowsInserted, winfo->totalRowsInserted,
totalAffectedRows); winfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
//int64_t t2 = taosGetTimestampMs();
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
} else { } else {
//int64_t t1 = taosGetTimestampMs();
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
//int64_t t2 = taosGetTimestampMs();
//printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if (0 != retCode) { if (0 != retCode) {
printf("========restful return fail, threadID[%d]\n", winfo->threadID); printf("========restful return fail, threadID[%d]\n", winfo->threadID);
...@@ -4465,21 +4486,10 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4465,21 +4486,10 @@ static void* syncWriteWithStb(void *sarg) {
if (g_args.insert_interval) { if (g_args.insert_interval) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
} }
/*
if (loop_cnt) { if (tblInserted >= superTblInfo->insertRows)
loop_cnt--;
if ((1 == loop_cnt) && (0 != nrecords_last_req)) {
nrecords_cur_req = nrecords_last_req;
} else if (0 == loop_cnt){
nrecords_cur_req = nrecords_no_last_req;
loop_cnt = loop_cnt_orig;
break;
}
} else {
break; break;
} } // num_of_DPT
*/
}
if (tID == winfo->end_table_id) { if (tID == winfo->end_table_id) {
if (0 == strncasecmp( if (0 == strncasecmp(
...@@ -4487,26 +4497,19 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4487,26 +4497,19 @@ static void* syncWriteWithStb(void *sarg) {
samplePos = sampleUsePos; samplePos = sampleUsePos;
} }
i = inserted;
time_counter = tmp_time;
}
} }
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
} } // tID
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
tmfree(sampleDataBuf); tmfree(sampleDataBuf);
tmfclose(fp); tmfclose(fp);
winfo->totalRowsInserted = totalRowsInserted;
winfo->totalAffectedRows = totalAffectedRows;
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->threadID,
totalRowsInserted, winfo->totalRowsInserted,
totalAffectedRows); winfo->totalAffectedRows);
return NULL; return NULL;
} }
...@@ -4524,7 +4527,8 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4524,7 +4527,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
char *data = calloc(1, MAX_DATA_SIZE); char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
if (winfo->counter >= winfo->superTblInfo->insertRows) { // if (winfo->counter >= winfo->superTblInfo->insertRows) {
if (winfo->counter >= g_args.num_of_RPR) {
winfo->start_table_id++; winfo->start_table_id++;
winfo->counter = 0; winfo->counter = 0;
} }
...@@ -4715,13 +4719,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4715,13 +4719,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
if (superTblInfo) { if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted; superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
}
totalDelay += t_info->totalDelay; totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay; cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
} }
}
cntDelay -= 1; cntDelay -= 1;
if (cntDelay == 0) cntDelay = 1; if (cntDelay == 0) cntDelay = 1;
...@@ -4769,11 +4773,12 @@ void *readTable(void *sarg) { ...@@ -4769,11 +4773,12 @@ void *readTable(void *sarg) {
} }
int num_of_DPT; int num_of_DPT;
if (rinfo->superTblInfo) { /* if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
} else { } else {
*/
num_of_DPT = g_args.num_of_DPT; num_of_DPT = g_args.num_of_DPT;
} // }
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables; int totalData = num_of_DPT * num_of_tables;
...@@ -4907,7 +4912,7 @@ int insertTestProcess() { ...@@ -4907,7 +4912,7 @@ int insertTestProcess() {
if (ret == -1) if (ret == -1)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile); debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) { if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
...@@ -5063,7 +5068,7 @@ void *subQueryProcess(void *sarg) { ...@@ -5063,7 +5068,7 @@ void *subQueryProcess(void *sarg) {
int64_t st = 0; int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000;
while (1) { while (1) {
if (g_queryInfo.subQueryInfo.rate && (et - st) < g_queryInfo.subQueryInfo.rate*1000) { if (g_queryInfo.subQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
} }
...@@ -5143,7 +5148,7 @@ static int queryTestProcess() { ...@@ -5143,7 +5148,7 @@ static int queryTestProcess() {
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
...@@ -5254,7 +5259,7 @@ void *subSubscribeProcess(void *sarg) { ...@@ -5254,7 +5259,7 @@ void *subSubscribeProcess(void *sarg) {
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
return NULL; return NULL;
} }
...@@ -5320,7 +5325,7 @@ void *superSubscribeProcess(void *sarg) { ...@@ -5320,7 +5325,7 @@ void *superSubscribeProcess(void *sarg) {
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
return NULL; return NULL;
} }
...@@ -5685,8 +5690,15 @@ void querySqlFile(TAOS* taos, char* sqlFile) ...@@ -5685,8 +5690,15 @@ void querySqlFile(TAOS* taos, char* sqlFile)
} }
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
queryDbExec(taos, cmd, NO_INSERT_TYPE); queryDbExec(taos, cmd, NO_INSERT_TYPE);
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) {
printf("queryDbExec %s failed!\n", cmd);
tmfree(cmd);
tmfree(line);
tmfclose(fp);
return;
}
memset(cmd, 0, MAX_SQL_SIZE); memset(cmd, 0, MAX_SQL_SIZE);
cmd_len = 0; cmd_len = 0;
} }
...@@ -5773,7 +5785,7 @@ static void testCmdLine() { ...@@ -5773,7 +5785,7 @@ static void testCmdLine() {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
parse_args(argc, argv, &g_args); parse_args(argc, argv, &g_args);
debugPrint("DEBUG - meta file: %s\n", g_args.metaFile); debugPrint("meta file: %s\n", g_args.metaFile);
if (g_args.metaFile) { if (g_args.metaFile) {
initOfInsertMeta(); initOfInsertMeta();
......
...@@ -35,16 +35,19 @@ import os ...@@ -35,16 +35,19 @@ import os
import signal import signal
import traceback import traceback
import resource import resource
from guppy import hpy # from guppy import hpy
import gc import gc
from crash_gen.service_manager import ServiceManager, TdeInstance from crash_gen.service_manager import ServiceManager, TdeInstance
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
import crash_gen.settings
import taos import taos
import requests import requests
crash_gen.settings.init()
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
raise Exception("Must be using Python 3") raise Exception("Must be using Python 3")
...@@ -259,6 +262,7 @@ class ThreadCoordinator: ...@@ -259,6 +262,7 @@ class ThreadCoordinator:
self._execStats = ExecutionStats() self._execStats = ExecutionStats()
self._runStatus = Status.STATUS_RUNNING self._runStatus = Status.STATUS_RUNNING
self._initDbs() self._initDbs()
self._stepStartTime = None # Track how long it takes to execute each step
def getTaskExecutor(self): def getTaskExecutor(self):
return self._te return self._te
...@@ -394,6 +398,10 @@ class ThreadCoordinator: ...@@ -394,6 +398,10 @@ class ThreadCoordinator:
try: try:
self._syncAtBarrier() # For now just cross the barrier self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP) Progress.emit(Progress.END_THREAD_STEP)
if self._stepStartTime :
stepExecTime = time.time() - self._stepStartTime
Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests))
DbConnNative.resetTotalRequests() # reset to zero
except threading.BrokenBarrierError as err: except threading.BrokenBarrierError as err:
self._execStats.registerFailure("Aborted due to worker thread timeout") self._execStats.registerFailure("Aborted due to worker thread timeout")
Logging.error("\n") Logging.error("\n")
...@@ -433,6 +441,7 @@ class ThreadCoordinator: ...@@ -433,6 +441,7 @@ class ThreadCoordinator:
# Then we move on to the next step # Then we move on to the next step
Progress.emit(Progress.BEGIN_THREAD_STEP) Progress.emit(Progress.BEGIN_THREAD_STEP)
self._stepStartTime = time.time()
self._releaseAllWorkerThreads(transitionFailed) self._releaseAllWorkerThreads(transitionFailed)
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
...@@ -691,7 +700,7 @@ class AnyState: ...@@ -691,7 +700,7 @@ class AnyState:
def canDropDb(self): def canDropDb(self):
# If user requests to run up to a number of DBs, # If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more # we'd then not do drop_db operations any more
if gConfig.max_dbs > 0 : if gConfig.max_dbs > 0 or gConfig.use_shadow_db :
return False return False
return self._info[self.CAN_DROP_DB] return self._info[self.CAN_DROP_DB]
...@@ -699,6 +708,8 @@ class AnyState: ...@@ -699,6 +708,8 @@ class AnyState:
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self): def canDropFixedSuperTable(self):
if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
return False
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
def canAddData(self): def canAddData(self):
...@@ -1037,7 +1048,7 @@ class Database: ...@@ -1037,7 +1048,7 @@ class Database:
_clsLock = threading.Lock() # class wide lock _clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer _lastInt = 101 # next one is initial integer
_lastTick = 0 _lastTick = 0
_lastLaggingTick = 0 # lagging tick, for unsequenced insersions _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
self._dbNum = dbNum # we assign a number to databases, for our testing purpose self._dbNum = dbNum # we assign a number to databases, for our testing purpose
...@@ -1093,21 +1104,24 @@ class Database: ...@@ -1093,21 +1104,24 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above t3.timestamp() + elSec2) # see explanation above
Logging.debug("Setting up TICKS to start from: {}".format(t4)) Logging.info("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
@classmethod @classmethod
def getNextTick(cls): def getNextTick(cls):
'''
Fetch a timestamp tick, with some random factor, may not be unique.
'''
with cls._clsLock: # prevent duplicate tick with cls._clsLock: # prevent duplicate tick
if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
# 10k at 1/20 chance, should be enough to avoid overlaps # 10k at 1/20 chance, should be enough to avoid overlaps
tick = cls.setupLastTick() tick = cls.setupLastTick()
cls._lastTick = tick cls._lastTick = tick
cls._lastLaggingTick = tick + datetime.timedelta(0, -10000) cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future # if : # should be quite a bit into the future
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick return cls._lastLaggingTick
else: # regular else: # regular
# add one second to it # add one second to it
...@@ -1334,7 +1348,8 @@ class Task(): ...@@ -1334,7 +1348,8 @@ class Task():
elif self._isErrAcceptable(errno2, err.__str__()): elif self._isErrAcceptable(errno2, err.__str__()):
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())) errno2, err, wt.getDbConn().getLastSql()))
print("_", end="", flush=True) # print("_", end="", flush=True)
Progress.emit(Progress.ACCEPTABLE_ERROR)
self._err = err self._err = err
else: # not an acceptable error else: # not an acceptable error
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
...@@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask): ...@@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask):
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
numReplica = gConfig.max_replicas # fixed, always numReplica = gConfig.max_replicas # fixed, always
repStr = "replica {}".format(numReplica) repStr = "replica {}".format(numReplica)
self.execWtSql(wt, "create database {} {}" updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
.format(self._db.getName(), repStr) ) dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
if dbName == "db_0" and gConfig.use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@classmethod @classmethod
...@@ -1774,7 +1792,7 @@ class TdSuperTable: ...@@ -1774,7 +1792,7 @@ class TdSuperTable:
]) # TODO: add more from 'top' ]) # TODO: add more from 'top'
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance if Dice.throw(3) == 0: # 1 in X chance
sql = sql + ' GROUP BY color' sql = sql + ' GROUP BY color'
...@@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask): ...@@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask):
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
sql = "insert into {} values ".format(fullTableName) sql = "INSERT INTO {} VALUES ".format(fullTableName)
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
...@@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask): ...@@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask):
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try: try:
sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {}) sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
fullTableName, fullTableName,
# ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt, nextColor) nextTick, nextInt, nextColor)
dbc.execute(sql) dbc.execute(sql)
# Quick hack, attach an update statement here. TODO: create an "update" task
if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt()
nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
fullTableName,
nextTick, nextInt, nextColor)
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
dbc.execute(sql)
except: # Any exception at all except: # Any exception at all
if gConfig.verify_data: if gConfig.verify_data:
self.unlockTable(fullTableName) self.unlockTable(fullTableName)
...@@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask): ...@@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask):
random.shuffle(tblSeq) # now we have random sequence random.shuffle(tblSeq) # now we have random sequence
for i in tblSeq: for i in tblSeq:
if (i in self.activeTable): # wow already active if (i in self.activeTable): # wow already active
print("x", end="", flush=True) # concurrent insertion # print("x", end="", flush=True) # concurrent insertion
Progress.emit(Progress.CONCURRENT_INSERTION)
else: else:
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
...@@ -2373,6 +2404,11 @@ class MainExec: ...@@ -2373,6 +2404,11 @@ class MainExec:
'--larger-data', '--larger-data',
action='store_true', action='store_true',
help='Write larger amount of data during write operations (default: false)') help='Write larger amount of data during write operations (default: false)')
parser.add_argument(
'-m',
'--mix-oos-data',
action='store_false',
help='Mix out-of-sequence data into the test data stream (default: true)')
parser.add_argument( parser.add_argument(
'-n', '-n',
'--dynamic-db-table-names', '--dynamic-db-table-names',
...@@ -2414,6 +2450,11 @@ class MainExec: ...@@ -2414,6 +2450,11 @@ class MainExec:
'--verify-data', '--verify-data',
action='store_true', action='store_true',
help='Verify data written in a number of places by reading back (default: false)') help='Verify data written in a number of places by reading back (default: false)')
parser.add_argument(
'-w',
'--use-shadow-db',
action='store_true',
help='Use a shaddow database to verify data integrity (default: false)')
parser.add_argument( parser.add_argument(
'-x', '-x',
'--continue-on-exception', '--continue-on-exception',
...@@ -2422,6 +2463,11 @@ class MainExec: ...@@ -2422,6 +2463,11 @@ class MainExec:
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()
crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var
# Sanity check for arguments
if gConfig.use_shadow_db and gConfig.max_dbs>1 :
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
Logging.clsInit(gConfig) Logging.clsInit(gConfig)
......
...@@ -18,6 +18,8 @@ import datetime ...@@ -18,6 +18,8 @@ import datetime
import traceback import traceback
# from .service_manager import TdeInstance # from .service_manager import TdeInstance
import crash_gen.settings
class DbConn: class DbConn:
TYPE_NATIVE = "native-c" TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api" TYPE_REST = "rest-api"
...@@ -257,6 +259,27 @@ class MyTDSql: ...@@ -257,6 +259,27 @@ class MyTDSql:
cls.longestQuery = sql cls.longestQuery = sql
cls.longestQueryTime = queryTime cls.longestQueryTime = queryTime
cls.lqStartTime = startTime cls.lqStartTime = startTime
# Now write to the shadow database
if crash_gen.settings.gConfig.use_shadow_db:
if sql[:11] == "INSERT INTO":
if sql[:16] == "INSERT INTO db_0":
sql2 = "INSERT INTO db_s" + sql[16:]
self._cursor.execute(sql2)
else:
raise CrashGenError("Did not find db_0 in INSERT statement: {}".format(sql))
else: # not an insert statement
pass
if sql[:12] == "CREATE TABLE":
if sql[:17] == "CREATE TABLE db_0":
sql2 = sql.replace('db_0', 'db_s')
self._cursor.execute(sql2)
else:
raise CrashGenError("Did not find db_0 in CREATE TABLE statement: {}".format(sql))
else: # not an insert statement
pass
return ret return ret
def query(self, sql): def query(self, sql):
...@@ -302,6 +325,7 @@ class DbConnNative(DbConn): ...@@ -302,6 +325,7 @@ class DbConnNative(DbConn):
_lock = threading.Lock() _lock = threading.Lock()
# _connInfoDisplayed = False # TODO: find another way to display this # _connInfoDisplayed = False # TODO: find another way to display this
totalConnections = 0 # Not private totalConnections = 0 # Not private
totalRequests = 0
def __init__(self, dbTarget): def __init__(self, dbTarget):
super().__init__(dbTarget) super().__init__(dbTarget)
...@@ -309,6 +333,11 @@ class DbConnNative(DbConn): ...@@ -309,6 +333,11 @@ class DbConnNative(DbConn):
self._conn = None self._conn = None
# self._cursor = None # self._cursor = None
@classmethod
def resetTotalRequests(cls):
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
cls.totalRequests = 0
def openByType(self): # Open connection def openByType(self): # Open connection
# global gContainer # global gContainer
# tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance # tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
...@@ -356,6 +385,8 @@ class DbConnNative(DbConn): ...@@ -356,6 +385,8 @@ class DbConnNative(DbConn):
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.execute(sql) nRows = self._tdSql.execute(sql)
cls = self.__class__
cls.totalRequests += 1
Logging.debug( Logging.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format( "[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql)) nRows, sql))
...@@ -369,6 +400,8 @@ class DbConnNative(DbConn): ...@@ -369,6 +400,8 @@ class DbConnNative(DbConn):
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.query(sql) nRows = self._tdSql.query(sql)
cls = self.__class__
cls.totalRequests += 1
Logging.debug( Logging.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format( "[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql)) nRows, sql))
......
...@@ -176,11 +176,13 @@ class Progress: ...@@ -176,11 +176,13 @@ class Progress:
SERVICE_START_NAP = 7 SERVICE_START_NAP = 7
CREATE_TABLE_ATTEMPT = 8 CREATE_TABLE_ATTEMPT = 8
QUERY_GROUP_BY = 9 QUERY_GROUP_BY = 9
CONCURRENT_INSERTION = 10
ACCEPTABLE_ERROR = 11
tokens = { tokens = {
STEP_BOUNDARY: '.', STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[', BEGIN_THREAD_STEP: ' [',
END_THREAD_STEP: '] ', END_THREAD_STEP: ']',
SERVICE_HEART_BEAT: '.Y.', SERVICE_HEART_BEAT: '.Y.',
SERVICE_RECONNECT_START: '<r.', SERVICE_RECONNECT_START: '<r.',
SERVICE_RECONNECT_SUCCESS: '.r>', SERVICE_RECONNECT_SUCCESS: '.r>',
...@@ -188,8 +190,14 @@ class Progress: ...@@ -188,8 +190,14 @@ class Progress:
SERVICE_START_NAP: '_zz', SERVICE_START_NAP: '_zz',
CREATE_TABLE_ATTEMPT: 'c', CREATE_TABLE_ATTEMPT: 'c',
QUERY_GROUP_BY: 'g', QUERY_GROUP_BY: 'g',
CONCURRENT_INSERTION: 'x',
ACCEPTABLE_ERROR: '_',
} }
@classmethod @classmethod
def emit(cls, token): def emit(cls, token):
print(cls.tokens[token], end="", flush=True) print(cls.tokens[token], end="", flush=True)
@classmethod
def emitStr(cls, str):
print('({})'.format(str), end="", flush=True)
from __future__ import annotations
import argparse
gConfig: argparse.Namespace
def init():
global gConfig
gConfig = []
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册