diff --git a/documentation20/cn/10.cluster/docs.md b/documentation20/cn/10.cluster/docs.md index 6d7d68fe1b5adc3fd7896c4070a4979398553cb2..7b4073a8836da7b14a50afee5c243d0916bc9270 100644 --- a/documentation20/cn/10.cluster/docs.md +++ b/documentation20/cn/10.cluster/docs.md @@ -225,7 +225,13 @@ SHOW MNODES; ## Arbitrator的使用 -如果副本数为偶数,当一个vnode group里一半vnode不工作时,是无法从中选出master的。同理,一半mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了Arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含Arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到Arbitrator,那么节点B就能正常工作。 +如果副本数为偶数,当一个 vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B 就能正常工作。 -TDengine提供一个执行程序,名为 tarbitrator,找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为Arbitrator的End Point。如果该参数配置了,当副本数为偶数时,系统将自动连接配置的Arbitrator。如果副本数为奇数,即使配置了Arbitrator,系统也不会去建立连接。 +总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。 + +Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没有要求,只需要保证有网络连接,找任何一台 Linux 服务器运行它即可。以下简要描述安装配置的步骤: +1. 请点击 [安装包下载](https://www.taosdata.com/cn/all-downloads/),在 TDengine Arbitrator Linux 一节中,选择合适的版本下载并安装。 +2. 该应用的命令行参数 `-p` 可以指定其对外服务的端口号,缺省是 6042。 +3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。) +4. 在配置文件中配置了的 Arbitrator,会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。 diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index 9f32541612c48d9d68cbbbb0799a55ca2c2e5838..d00314fcbc806ab89b9f71a3a28880c7878f4579 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -198,6 +198,14 @@ void dnodeCleanupVnodes() { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) { dError("status rsp is received, error:%s", tstrerror(pMsg->code)); + if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) { + char clusterId[TSDB_CLUSTER_ID_LEN]; + dnodeGetClusterId(clusterId); + if (clusterId[0] != '\0') { + dError("exit zombie dropped dnode"); + exit(EXIT_FAILURE); + } + } taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 11ed37e4cbc746159c186271b5483d6e17ae660b..2711447cdc781f90e2b806f7ebd863d870cfb699 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -187,6 +187,7 @@ typedef struct SArguments_S { bool insert_only; bool answer_yes; bool debug_print; + bool verbose_print; char * output_file; int mode; char * datatype[MAX_NUM_DATATYPE + 1]; @@ -491,6 +492,7 @@ SArguments g_args = { false, // use_metric false, // insert_only false, // debug_print + false, // verbose_print false, // answer_yes; "./output.txt", // output_file 0, // mode : sync or async @@ -528,7 +530,11 @@ static SQueryMetaInfo g_queryInfo; static FILE * g_fpOfInsertResult = NULL; #define debugPrint(fmt, ...) \ - do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) + do { if (g_args.debug_print || g_args.verbose_print) \ + fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0) +#define verbosePrint(fmt, ...) \ + do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) + /////////////////////////////////////////////////// void printHelp() { @@ -693,6 +699,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->answer_yes = true; } else if (strcmp(argv[i], "-g") == 0) { arguments->debug_print = true; + } else if (strcmp(argv[i], "-gg") == 0) { + arguments->verbose_print = true; } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-O") == 0) { @@ -750,7 +758,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printf("\n"); } printf("# Insertion interval: %d\n", arguments->insert_interval); - printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); + printf("# Number of records per req: %d\n", arguments->num_of_RPR); printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); @@ -807,7 +815,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { } if (code != 0) { - debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); + debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); @@ -2057,7 +2065,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, exit(-1); } snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2110,13 +2118,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); return -1; } - debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); + debugPrint("create supertable %s success!\n\n", superTbls->sTblName); } return 0; } @@ -2135,7 +2143,7 @@ static int createDatabases() { for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].drop) { sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); return -1; @@ -2203,7 +2211,7 @@ static int createDatabases() { "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + debugPrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); @@ -2211,11 +2219,11 @@ static int createDatabases() { } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); + debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); @@ -2303,7 +2311,7 @@ static void* createTable(void *sarg) } len = 0; - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2318,7 +2326,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -2356,7 +2364,7 @@ int startMultiThreadCreateChildTable( t_info->threadID = i; tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); t_info->superTblInfo = superTblInfo; - debugPrint("DEBUG %s() %d db_name: %s\n", __func__, __LINE__, db_name); + verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); t_info->taos = taos_connect( g_Dbs.host, g_Dbs.user, @@ -2407,7 +2415,7 @@ static void createChildTables() { continue; } - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, @@ -2433,7 +2441,7 @@ static void createChildTables() { len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - debugPrint("DEBUG - %s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); startMultiThreadCreateChildTable( tblColsBuf, @@ -3294,7 +3302,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } - cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); if (insertRows && insertRows->type == cJSON_Number) { @@ -3658,7 +3665,7 @@ PARSE_OVER: } static bool getInfoFromJsonFile(char* file) { - debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file); + debugPrint("%s %d %s\n", __func__, __LINE__, file); FILE *fp = fopen(file, "r"); if (!fp) { @@ -3817,6 +3824,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* return (-1); } } + dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); @@ -3847,7 +3855,6 @@ static void syncWriteForNumberOfTblInOneSql( } uint64_t time_counter = winfo->start_time; - int64_t tmp_time; int sampleUsePos; int64_t st = 0; @@ -3855,6 +3862,7 @@ static void syncWriteForNumberOfTblInOneSql( for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { + int64_t tmp_time = 0; int inserted = i; for (int k = 0; k < g_args.num_of_RPR;) { @@ -3884,7 +3892,7 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values ", + "insert into %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, @@ -3894,7 +3902,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s%d using %s.%s tags %s values ", + " %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, @@ -3907,13 +3915,13 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s values ", + "insert into %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s values ", + " %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } @@ -3921,14 +3929,14 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values ", + "insert into %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s%d values ", + " %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); @@ -3963,7 +3971,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + tmp_time += superTblInfo->timeStampStep, superTblInfo); } if (retLen < 0) { @@ -3991,7 +3999,7 @@ static void syncWriteForNumberOfTblInOneSql( send_to_server: if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms } @@ -4008,7 +4016,7 @@ send_to_server: int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec( winfo->taos, buffer, INSERT_TYPE); @@ -4021,16 +4029,16 @@ send_to_server: if (delay < winfo->minDelay) winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; } - totalAffectedRows += affectedRows; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } //int64_t t2 = taosGetTimestampMs(); @@ -4161,15 +4169,18 @@ static void* syncWrite(void *sarg) { uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_DPT;) { + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int inserted = i; + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + for (int i = 0; i < g_args.num_of_DPT;) { + + int tblInserted = i; int64_t tmp_time = time_counter; char *pstr = buffer; pstr += sprintf(pstr, - "insert into %s.%s%d values", + "insert into %s.%s%d values ", winfo->db_name, g_args.tb_prefix, tID); int k; for (k = 0; k < g_args.num_of_RPR;) { @@ -4193,31 +4204,34 @@ static void* syncWrite(void *sarg) { } pstr += sprintf(pstr, " %s", data); - inserted++; + tblInserted++; k++; + i++; - if (inserted >= g_args.num_of_DPT) + if (tblInserted >= g_args.num_of_DPT) break; } + winfo->totalRowsInserted += k; /* puts(buffer); */ int64_t startTs; int64_t endTs; startTs = taosGetTimestampUs(); //queryDB(winfo->taos, buffer); - if (i > 0 && g_args.insert_interval + if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms - } + } - if (g_args.insert_interval) { + if (g_args.insert_interval) { st = taosGetTimestampMs(); - } - debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); + } + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); - + + verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows); if (0 <= affectedRows){ endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; @@ -4227,27 +4241,31 @@ static void* syncWrite(void *sarg) { winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } - if (g_args.insert_interval) { + verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); + if (g_args.insert_interval) { et = taosGetTimestampMs(); - } + } - if (tID == winfo->end_table_id) { - i = inserted; - time_counter = tmp_time; + if (tblInserted >= g_args.num_of_DPT) { + break; } - } + } // num_of_DPT + } // tId + + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + winfo->threadID, + winfo->totalRowsInserted, + winfo->totalAffectedRows); - } return NULL; } static void* syncWriteWithStb(void *sarg) { - uint64_t totalRowsInserted = 0; - uint64_t totalAffectedRows = 0; uint64_t lastPrintTime = taosGetTimestampMs(); threadInfo *winfo = (threadInfo *)sarg; @@ -4303,27 +4321,44 @@ static void* syncWriteWithStb(void *sarg) { return NULL; } - int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; - debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; - for (int i = 0; i < superTblInfo->insertRows;) { + int sampleUsePos; - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - uint64_t inserted = i; - uint64_t tmp_time = time_counter; + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); - int sampleUsePos = samplePos; - int k = 0; - debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { - int len = 0; - memset(buffer, 0, superTblInfo->maxSqlLen); - char *pstr = buffer; + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + tID++) { + int64_t start_time = winfo->start_time; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + for (int i = 0; i < superTblInfo->insertRows;) { + + int64_t tblInserted = i; + + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + + sampleUsePos = samplePos; + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + + memset(buffer, 0, superTblInfo->maxSqlLen); + int len = 0; + + char *pstr = buffer; + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); @@ -4346,27 +4381,29 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values", winfo->db_name, superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); - } else { + } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values", winfo->db_name, superTblInfo->childTblPrefix, tID); - } + } + int k; + for (k = 0; k < g_args.num_of_RPR;) { int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time + superTblInfo->timeStampStep * i, superTblInfo, &sampleUsePos, fp, @@ -4378,54 +4415,44 @@ static void* syncWriteWithStb(void *sarg) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = tmp_time - rand() % superTblInfo->disorderRange; + int64_t d = start_time - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, - superTblInfo->maxSqlLen - len, d, + superTblInfo->maxSqlLen - len, + d, superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); } else { retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time + superTblInfo->timeStampStep * i, superTblInfo); } if (retLen < 0) { goto free_and_statistics_2; } } -/* len += retLen; -*/ - inserted++; + + len += retLen; + verbosePrint("%s() LN%d retLen=%d len=%d k=%d buffer=%s\n", __func__, __LINE__, retLen, len, k, buffer); + + tblInserted++; k++; - totalRowsInserted++; - - if (inserted > superTblInfo->insertRows) - break; -/* if (inserted >= superTblInfo->insertRows - || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) - break; -*/ - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); - taosMsleep(sleep_time); // ms - } + i++; - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } + if (tblInserted >= superTblInfo->insertRows) + break; + } + + winfo->totalRowsInserted += k; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - //printf("===== sql: %s \n\n", buffer); - //int64_t t1 = taosGetTimestampMs(); + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { int64_t startTs; int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); if (0 > affectedRows){ @@ -4437,76 +4464,52 @@ static void* syncWriteWithStb(void *sarg) { if (delay < winfo->minDelay) winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } - totalAffectedRows += affectedRows; + winfo->totalAffectedRows += affectedRows; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } - //int64_t t2 = taosGetTimestampMs(); - //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); - } else { - //int64_t t1 = taosGetTimestampMs(); + } else { int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - //int64_t t2 = taosGetTimestampMs(); - //printf("http insert sql return, Spent %ld ms \n", t2 - t1); if (0 != retCode) { printf("========restful return fail, threadID[%d]\n", winfo->threadID); goto free_and_statistics_2; } - } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } -/* - if (loop_cnt) { - loop_cnt--; - if ((1 == loop_cnt) && (0 != nrecords_last_req)) { - nrecords_cur_req = nrecords_last_req; - } else if (0 == loop_cnt){ - nrecords_cur_req = nrecords_no_last_req; - loop_cnt = loop_cnt_orig; - break; - } - } else { - break; - } - */ } + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } + + if (tblInserted >= superTblInfo->insertRows) + break; + } // num_of_DPT - if (tID == winfo->end_table_id) { + if (tID == winfo->end_table_id) { if (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } - i = inserted; - time_counter = tmp_time; - } } - //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); - } + } // tID free_and_statistics_2: tmfree(buffer); tmfree(sampleDataBuf); tmfclose(fp); - winfo->totalRowsInserted = totalRowsInserted; - winfo->totalAffectedRows = totalAffectedRows; - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); return NULL; } @@ -4524,7 +4527,8 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); - if (winfo->counter >= winfo->superTblInfo->insertRows) { +// if (winfo->counter >= winfo->superTblInfo->insertRows) { + if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; } @@ -4715,12 +4719,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalRowsInserted += t_info->totalRowsInserted; - - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } + + totalDelay += t_info->totalDelay; + cntDelay += t_info->cntDelay; + if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } cntDelay -= 1; @@ -4769,11 +4773,12 @@ void *readTable(void *sarg) { } int num_of_DPT; - if (rinfo->superTblInfo) { +/* if (rinfo->superTblInfo) { num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; } else { + */ num_of_DPT = g_args.num_of_DPT; - } +// } int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4907,7 +4912,7 @@ int insertTestProcess() { if (ret == -1) exit(EXIT_FAILURE); - debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile); + debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); if (NULL == g_fpOfInsertResult) { fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); @@ -5063,7 +5068,7 @@ void *subQueryProcess(void *sarg) { int64_t st = 0; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; while (1) { - if (g_queryInfo.subQueryInfo.rate && (et - st) < g_queryInfo.subQueryInfo.rate*1000) { + if (g_queryInfo.subQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) { taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); } @@ -5143,7 +5148,7 @@ static int queryTestProcess() { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); } else { t_info->taos = NULL; @@ -5254,7 +5259,7 @@ void *subSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ return NULL; } @@ -5320,7 +5325,7 @@ void *superSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { return NULL; } @@ -5580,7 +5585,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; - g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; @@ -5685,8 +5690,15 @@ void querySqlFile(TAOS* taos, char* sqlFile) } memcpy(cmd + cmd_len, line, read_len); - debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); + verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd); queryDbExec(taos, cmd, NO_INSERT_TYPE); + if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) { + printf("queryDbExec %s failed!\n", cmd); + tmfree(cmd); + tmfree(line); + tmfclose(fp); + return; + } memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; } @@ -5773,7 +5785,7 @@ static void testCmdLine() { int main(int argc, char *argv[]) { parse_args(argc, argv, &g_args); - debugPrint("DEBUG - meta file: %s\n", g_args.metaFile); + debugPrint("meta file: %s\n", g_args.metaFile); if (g_args.metaFile) { initOfInsertMeta(); diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 309c0df9108f340bf96d73529ccf8bb49c1c9692..43506c68d51b134347df48080ef7f3223b379775 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -35,16 +35,19 @@ import os import signal import traceback import resource -from guppy import hpy +# from guppy import hpy import gc from crash_gen.service_manager import ServiceManager, TdeInstance from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager +import crash_gen.settings import taos import requests +crash_gen.settings.init() + # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -259,6 +262,7 @@ class ThreadCoordinator: self._execStats = ExecutionStats() self._runStatus = Status.STATUS_RUNNING self._initDbs() + self._stepStartTime = None # Track how long it takes to execute each step def getTaskExecutor(self): return self._te @@ -394,6 +398,10 @@ class ThreadCoordinator: try: self._syncAtBarrier() # For now just cross the barrier Progress.emit(Progress.END_THREAD_STEP) + if self._stepStartTime : + stepExecTime = time.time() - self._stepStartTime + Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests)) + DbConnNative.resetTotalRequests() # reset to zero except threading.BrokenBarrierError as err: self._execStats.registerFailure("Aborted due to worker thread timeout") Logging.error("\n") @@ -433,6 +441,7 @@ class ThreadCoordinator: # Then we move on to the next step Progress.emit(Progress.BEGIN_THREAD_STEP) + self._stepStartTime = time.time() self._releaseAllWorkerThreads(transitionFailed) if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" @@ -691,7 +700,7 @@ class AnyState: def canDropDb(self): # If user requests to run up to a number of DBs, # we'd then not do drop_db operations any more - if gConfig.max_dbs > 0 : + if gConfig.max_dbs > 0 or gConfig.use_shadow_db : return False return self._info[self.CAN_DROP_DB] @@ -699,6 +708,8 @@ class AnyState: return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] def canDropFixedSuperTable(self): + if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table + return False return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] def canAddData(self): @@ -1037,7 +1048,7 @@ class Database: _clsLock = threading.Lock() # class wide lock _lastInt = 101 # next one is initial integer _lastTick = 0 - _lastLaggingTick = 0 # lagging tick, for unsequenced insersions + _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc self._dbNum = dbNum # we assign a number to databases, for our testing purpose @@ -1093,21 +1104,24 @@ class Database: t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above - Logging.debug("Setting up TICKS to start from: {}".format(t4)) + Logging.info("Setting up TICKS to start from: {}".format(t4)) return t4 @classmethod - def getNextTick(cls): + def getNextTick(cls): + ''' + Fetch a timestamp tick, with some random factor, may not be unique. + ''' with cls._clsLock: # prevent duplicate tick if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized # 10k at 1/20 chance, should be enough to avoid overlaps tick = cls.setupLastTick() cls._lastTick = tick - cls._lastLaggingTick = tick + datetime.timedelta(0, -10000) + cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast # if : # should be quite a bit into the future - if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick - cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds + if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick + cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence return cls._lastLaggingTick else: # regular # add one second to it @@ -1334,7 +1348,8 @@ class Task(): elif self._isErrAcceptable(errno2, err.__str__()): self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) - print("_", end="", flush=True) + # print("_", end="", flush=True) + Progress.emit(Progress.ACCEPTABLE_ERROR) self._err = err else: # not an acceptable error errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( @@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask): # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N numReplica = gConfig.max_replicas # fixed, always repStr = "replica {}".format(numReplica) - self.execWtSql(wt, "create database {} {}" - .format(self._db.getName(), repStr) ) + updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active + dbName = self._db.getName() + self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) + if dbName == "db_0" and gConfig.use_shadow_db: + self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) class TaskDropDb(StateTransitionTask): @classmethod @@ -1774,13 +1792,13 @@ class TdSuperTable: ]) # TODO: add more from 'top' - if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! - sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) - if Dice.throw(3) == 0: # 1 in X chance - sql = sql + ' GROUP BY color' - Progress.emit(Progress.QUERY_GROUP_BY) - # Logging.info("Executing GROUP-BY query: " + sql) - ret.append(SqlQuery(sql)) + # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049) + sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) + if Dice.throw(3) == 0: # 1 in X chance + sql = sql + ' GROUP BY color' + Progress.emit(Progress.QUERY_GROUP_BY) + # Logging.info("Executing GROUP-BY query: " + sql) + ret.append(SqlQuery(sql)) return ret @@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask): numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS fullTableName = db.getName() + '.' + regTableName - sql = "insert into {} values ".format(fullTableName) + sql = "INSERT INTO {} VALUES ".format(fullTableName) for j in range(numRecords): # number of records per table nextInt = db.getNextInt() nextTick = db.getNextTick() @@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask): # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written try: - sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {}) + sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {}) fullTableName, # ds.getFixedSuperTableName(), # ds.getNextBinary(), ds.getNextFloat(), nextTick, nextInt, nextColor) dbc.execute(sql) + + # Quick hack, attach an update statement here. TODO: create an "update" task + if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + nextInt = db.getNextInt() + nextColor = db.getNextColor() + sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here + fullTableName, + nextTick, nextInt, nextColor) + # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format( + # fullTableName, db.getNextInt(), db.getNextColor(), nextTick) + dbc.execute(sql) + except: # Any exception at all if gConfig.verify_data: self.unlockTable(fullTableName) @@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask): random.shuffle(tblSeq) # now we have random sequence for i in tblSeq: if (i in self.activeTable): # wow already active - print("x", end="", flush=True) # concurrent insertion + # print("x", end="", flush=True) # concurrent insertion + Progress.emit(Progress.CONCURRENT_INSERTION) else: self.activeTable.add(i) # marking it active @@ -2373,6 +2404,11 @@ class MainExec: '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') + parser.add_argument( + '-m', + '--mix-oos-data', + action='store_false', + help='Mix out-of-sequence data into the test data stream (default: true)') parser.add_argument( '-n', '--dynamic-db-table-names', @@ -2414,6 +2450,11 @@ class MainExec: '--verify-data', action='store_true', help='Verify data written in a number of places by reading back (default: false)') + parser.add_argument( + '-w', + '--use-shadow-db', + action='store_true', + help='Use a shaddow database to verify data integrity (default: false)') parser.add_argument( '-x', '--continue-on-exception', @@ -2422,6 +2463,11 @@ class MainExec: global gConfig gConfig = parser.parse_args() + crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var + + # Sanity check for arguments + if gConfig.use_shadow_db and gConfig.max_dbs>1 : + raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") Logging.clsInit(gConfig) diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/db.py index e38692dbe1e5c33ffe162015e3e60630fd51fa38..62a369c41a7ed0d73ab847232a206c2cabb53d53 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/db.py @@ -18,6 +18,8 @@ import datetime import traceback # from .service_manager import TdeInstance +import crash_gen.settings + class DbConn: TYPE_NATIVE = "native-c" TYPE_REST = "rest-api" @@ -244,7 +246,7 @@ class MyTDSql: self._conn.close() # TODO: very important, cursor close does NOT close DB connection! self._cursor.close() - def _execInternal(self, sql): + def _execInternal(self, sql): startTime = time.time() # Logging.debug("Executing SQL: " + sql) ret = self._cursor.execute(sql) @@ -257,6 +259,27 @@ class MyTDSql: cls.longestQuery = sql cls.longestQueryTime = queryTime cls.lqStartTime = startTime + + # Now write to the shadow database + if crash_gen.settings.gConfig.use_shadow_db: + if sql[:11] == "INSERT INTO": + if sql[:16] == "INSERT INTO db_0": + sql2 = "INSERT INTO db_s" + sql[16:] + self._cursor.execute(sql2) + else: + raise CrashGenError("Did not find db_0 in INSERT statement: {}".format(sql)) + else: # not an insert statement + pass + + if sql[:12] == "CREATE TABLE": + if sql[:17] == "CREATE TABLE db_0": + sql2 = sql.replace('db_0', 'db_s') + self._cursor.execute(sql2) + else: + raise CrashGenError("Did not find db_0 in CREATE TABLE statement: {}".format(sql)) + else: # not an insert statement + pass + return ret def query(self, sql): @@ -302,12 +325,18 @@ class DbConnNative(DbConn): _lock = threading.Lock() # _connInfoDisplayed = False # TODO: find another way to display this totalConnections = 0 # Not private + totalRequests = 0 def __init__(self, dbTarget): super().__init__(dbTarget) self._type = self.TYPE_NATIVE self._conn = None - # self._cursor = None + # self._cursor = None + + @classmethod + def resetTotalRequests(cls): + with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! + cls.totalRequests = 0 def openByType(self): # Open connection # global gContainer @@ -356,6 +385,8 @@ class DbConnNative(DbConn): Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.execute(sql) + cls = self.__class__ + cls.totalRequests += 1 Logging.debug( "[SQL] Execution Result, nRows = {}, SQL = {}".format( nRows, sql)) @@ -369,6 +400,8 @@ class DbConnNative(DbConn): Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.query(sql) + cls = self.__class__ + cls.totalRequests += 1 Logging.debug( "[SQL] Query Result, nRows = {}, SQL = {}".format( nRows, sql)) diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py index 6ea5691ce223eb1c14214d4b11c47cf85e29c795..9774ec5455961392d82ea2b4b59c0657b5704f9a 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/misc.py @@ -176,11 +176,13 @@ class Progress: SERVICE_START_NAP = 7 CREATE_TABLE_ATTEMPT = 8 QUERY_GROUP_BY = 9 + CONCURRENT_INSERTION = 10 + ACCEPTABLE_ERROR = 11 tokens = { STEP_BOUNDARY: '.', - BEGIN_THREAD_STEP: '[', - END_THREAD_STEP: '] ', + BEGIN_THREAD_STEP: ' [', + END_THREAD_STEP: ']', SERVICE_HEART_BEAT: '.Y.', SERVICE_RECONNECT_START: '', @@ -188,8 +190,14 @@ class Progress: SERVICE_START_NAP: '_zz', CREATE_TABLE_ATTEMPT: 'c', QUERY_GROUP_BY: 'g', + CONCURRENT_INSERTION: 'x', + ACCEPTABLE_ERROR: '_', } @classmethod def emit(cls, token): print(cls.tokens[token], end="", flush=True) + + @classmethod + def emitStr(cls, str): + print('({})'.format(str), end="", flush=True) diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/settings.py new file mode 100644 index 0000000000000000000000000000000000000000..3c4c91e6e077c325c53d15918624db783957fc20 --- /dev/null +++ b/tests/pytest/crash_gen/settings.py @@ -0,0 +1,8 @@ +from __future__ import annotations +import argparse + +gConfig: argparse.Namespace + +def init(): + global gConfig + gConfig = [] \ No newline at end of file