diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 81f48067d0ea6e60df0a9718cb90cf045885a370..b162be7739fc004210a9f221a63d12fcfda4430d 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -363,7 +363,7 @@ typedef struct SDbs_S { typedef struct SpecifiedQueryInfo_S { uint64_t queryInterval; // 0: unlimit > 0 loop/s - uint64_t concurrent; + uint32_t concurrent; uint64_t sqlCount; uint32_t asyncMode; // 0: sync, 1: async uint64_t subscribeInterval; // ms @@ -374,6 +374,9 @@ typedef struct SpecifiedQueryInfo_S { char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; int resubAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; + char topic[MAX_QUERY_SQL_COUNT][32]; + int consumed[MAX_QUERY_SQL_COUNT]; + TAOS_RES* res[MAX_QUERY_SQL_COUNT]; uint64_t totalQueried; } SpecifiedQueryInfo; @@ -418,7 +421,8 @@ typedef struct SThreadInfo_S { int threadID; char db_name[MAX_DB_NAME_SIZE+1]; uint32_t time_precision; - char fp[4096]; + char filePath[4096]; + FILE *fp; char tb_prefix[MAX_TB_NAME_SIZE]; uint64_t start_table_from; uint64_t end_table_to; @@ -451,6 +455,7 @@ typedef struct SThreadInfo_S { // seq of query or subscribe uint64_t querySeq; // sequence number of sql command + TAOS_SUB* tsub; } threadInfo; @@ -532,7 +537,7 @@ static int createDatabasesAndStables(); static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int postProceSql(char *host, struct sockaddr_in *pServAddr, - uint16_t port, char* sqlstr, char *resultFile); + uint16_t port, char* sqlstr, threadInfo *pThreadInfo); /* ************ Global variables ************ */ @@ -1112,24 +1117,22 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { return 0; } -static void appendResultBufToFile(char *resultBuf, char *resultFile) +static void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo) { - FILE *fp = NULL; - if (resultFile[0] != 0) { - fp = fopen(resultFile, "at"); - if (fp == NULL) { - errorPrint( - "%s() LN%d, failed to open result file: %s, result will not save to file\n", - __func__, __LINE__, resultFile); - return; - } - fprintf(fp, "%s", resultBuf); - tmfclose(fp); - } -} + pThreadInfo->fp = fopen(pThreadInfo->filePath, "at"); + if (pThreadInfo->fp == NULL) { + errorPrint( + "%s() LN%d, failed to open result file: %s, result will not save to file\n", + __func__, __LINE__, pThreadInfo->filePath); + return; + } + fprintf(pThreadInfo->fp, "%s", resultBuf); + tmfclose(pThreadInfo->fp); + pThreadInfo->fp = NULL; +} -static void appendResultToFile(TAOS_RES *res, char* resultFile) { +static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) { TAOS_ROW row = NULL; int num_rows = 0; int num_fields = taos_field_count(res); @@ -1147,10 +1150,11 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { // fetch the records row by row while((row = taos_fetch_row(res))) { - if (totalLen >= 100*1024*1024 - 32000) { - appendResultBufToFile(databuf, resultFile); - totalLen = 0; - memset(databuf, 0, 100*1024*1024); + if ((strlen(pThreadInfo->filePath) > 0) + && (totalLen >= 100*1024*1024 - 32000)) { + appendResultBufToFile(databuf, pThreadInfo); + totalLen = 0; + memset(databuf, 0, 100*1024*1024); } num_rows++; int len = taos_print_row(temp, row, fields, num_fields); @@ -1161,8 +1165,10 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { } verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", - __func__, __LINE__, databuf, resultFile); - appendResultBufToFile(databuf, resultFile); + __func__, __LINE__, databuf, pThreadInfo->filePath); + if (strlen(pThreadInfo->filePath) > 0) { + appendResultBufToFile(databuf, pThreadInfo); + } free(databuf); } @@ -1178,16 +1184,14 @@ static void selectAndGetResult( return; } - if ((strlen(pThreadInfo->fp))) { - appendResultToFile(res, pThreadInfo->fp); - } + fetchResult(res, pThreadInfo); taos_free_result(res); } else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) { int retCode = postProceSql( g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, command, - pThreadInfo->fp); + pThreadInfo); if (0 != retCode) { printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID); } @@ -1720,7 +1724,7 @@ static void printfQueryMeta() { printf("query interval: \033[33m%"PRIu64" ms\033[0m\n", g_queryInfo.specifiedQueryInfo.queryInterval); printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times); - printf("concurrent: \033[33m%"PRIu64"\033[0m\n", + printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent); printf("mod: \033[33m%s\033[0m\n", (g_queryInfo.specifiedQueryInfo.asyncMode)?"async":"sync"); @@ -2017,13 +2021,13 @@ static void printfQuerySystemInfo(TAOS * taos) { // show variables res = taos_query(taos, "show variables;"); - //appendResultToFile(res, filename); + //fetchResult(res, filename); xDumpResultToFile(filename, res); // show dnodes res = taos_query(taos, "show dnodes;"); xDumpResultToFile(filename, res); - //appendResultToFile(res, filename); + //fetchResult(res, filename); // show databases res = taos_query(taos, "show databases;"); @@ -2059,7 +2063,7 @@ static void printfQuerySystemInfo(TAOS * taos) { } static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, - char* sqlstr, char *resultFile) + char* sqlstr, threadInfo *pThreadInfo) { char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; @@ -2195,8 +2199,8 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port response_buf[RESP_BUF_LEN - 1] = '\0'; printf("Response:\n%s\n", response_buf); - if (resultFile) { - appendResultBufToFile(response_buf, resultFile); + if (strlen(pThreadInfo->filePath) > 0) { + appendResultBufToFile(response_buf, pThreadInfo); } free(request_buf); @@ -2959,18 +2963,18 @@ static int startMultiThreadCreateChildTable( b = ntables % threads; for (int64_t i = 0; i < threads; i++) { - threadInfo *t_info = infos + i; - t_info->threadID = i; - tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); - t_info->superTblInfo = superTblInfo; + threadInfo *pThreadInfo = infos + i; + pThreadInfo->threadID = i; + tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE); + pThreadInfo->superTblInfo = superTblInfo; verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); - t_info->taos = taos_connect( + pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (t_info->taos == NULL) { + if (pThreadInfo->taos == NULL) { errorPrint( "%s() LN%d, Failed to connect to TDengine, reason:%s\n", __func__, __LINE__, taos_errstr(NULL)); free(pids); @@ -2978,14 +2982,14 @@ static int startMultiThreadCreateChildTable( return -1; } - t_info->start_table_from = startFrom; - t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = t_info->end_table_to + 1; - t_info->use_metric = true; - t_info->cols = cols; - t_info->minDelay = UINT64_MAX; - pthread_create(pids + i, NULL, createTable, t_info); + pThreadInfo->start_table_from = startFrom; + pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->use_metric = true; + pThreadInfo->cols = cols; + pThreadInfo->minDelay = UINT64_MAX; + pthread_create(pids + i, NULL, createTable, pThreadInfo); } for (int i = 0; i < threads; i++) { @@ -2993,8 +2997,8 @@ static int startMultiThreadCreateChildTable( } for (int i = 0; i < threads; i++) { - threadInfo *t_info = infos + i; - taos_close(t_info->taos); + threadInfo *pThreadInfo = infos + i; + taos_close(pThreadInfo->taos); } free(pids); @@ -4199,7 +4203,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (concurrent && concurrent->type == cJSON_Number) { if (concurrent->valueint <= 0) { errorPrint( - "%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n", + "%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.concurrent); @@ -4266,24 +4270,28 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } // sqls - cJSON* superSqls = cJSON_GetObjectItem(specifiedQuery, "sqls"); - if (!superSqls) { + cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls"); + if (!specifiedSqls) { g_queryInfo.specifiedQueryInfo.sqlCount = 0; - } else if (superSqls->type != cJSON_Array) { + } else if (specifiedSqls->type != cJSON_Array) { errorPrint("%s() LN%d, failed to read json, super sqls not found\n", __func__, __LINE__); goto PARSE_OVER; } else { - int superSqlSize = cJSON_GetArraySize(superSqls); - if (superSqlSize > MAX_QUERY_SQL_COUNT) { - errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n", - __func__, __LINE__, MAX_QUERY_SQL_COUNT); + int superSqlSize = cJSON_GetArraySize(specifiedSqls); + if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent + > MAX_QUERY_SQL_COUNT) { + errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n", + __func__, __LINE__, + superSqlSize, + g_queryInfo.specifiedQueryInfo.concurrent, + MAX_QUERY_SQL_COUNT); goto PARSE_OVER; } g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize; for (int j = 0; j < superSqlSize; ++j) { - cJSON* sql = cJSON_GetArrayItem(superSqls, j); + cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j); if (sql == NULL) continue; cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); @@ -4459,16 +4467,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.superQueryInfo.resubAfterConsume = 1; } - // sqls - cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls"); - if (!subsqls) { + // supert table sqls + cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); + if (!superSqls) { g_queryInfo.superQueryInfo.sqlCount = 0; - } else if (subsqls->type != cJSON_Array) { + } else if (superSqls->type != cJSON_Array) { errorPrint("%s() LN%d: failed to read json, super sqls not found\n", __func__, __LINE__); goto PARSE_OVER; } else { - int superSqlSize = cJSON_GetArraySize(subsqls); + int superSqlSize = cJSON_GetArraySize(superSqls); if (superSqlSize > MAX_QUERY_SQL_COUNT) { errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n", __func__, __LINE__, MAX_QUERY_SQL_COUNT); @@ -4477,7 +4485,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.superQueryInfo.sqlCount = superSqlSize; for (int j = 0; j < superSqlSize; ++j) { - cJSON* sql = cJSON_GetArrayItem(subsqls, j); + cJSON* sql = cJSON_GetArrayItem(superSqls, j); if (sql == NULL) continue; cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); @@ -5823,49 +5831,49 @@ static void startMultiThreadInsertData(int threads, char* db_name, } for (int i = 0; i < threads; i++) { - threadInfo *t_info = infos + i; - t_info->threadID = i; - tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); - t_info->time_precision = timePrec; - t_info->superTblInfo = superTblInfo; + threadInfo *pThreadInfo = infos + i; + pThreadInfo->threadID = i; + tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE); + pThreadInfo->time_precision = timePrec; + pThreadInfo->superTblInfo = superTblInfo; - t_info->start_time = start_time; - t_info->minDelay = UINT64_MAX; + pThreadInfo->start_time = start_time; + pThreadInfo->minDelay = UINT64_MAX; if ((NULL == superTblInfo) || (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { - //t_info->taos = taos; - t_info->taos = taos_connect( + //pThreadInfo->taos = taos; + pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == t_info->taos) { + if (NULL == pThreadInfo->taos) { errorPrint( "connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); exit(-1); } } else { - t_info->taos = NULL; + pThreadInfo->taos = NULL; } /* if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { */ - t_info->start_table_from = startFrom; - t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = t_info->end_table_to + 1; + pThreadInfo->start_table_from = startFrom; + pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = pThreadInfo->end_table_to + 1; /* } else { - t_info->start_table_from = 0; - t_info->ntables = superTblInfo->childTblCount; - t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint(); + pThreadInfo->start_table_from = 0; + pThreadInfo->ntables = superTblInfo->childTblCount; + pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); } */ - tsem_init(&(t_info->lock_sem), 0, 0); + tsem_init(&(pThreadInfo->lock_sem), 0, 0); if (ASYNC_MODE == g_Dbs.asyncMode) { - pthread_create(pids + i, NULL, asyncWrite, t_info); + pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); } else { - pthread_create(pids + i, NULL, syncWrite, t_info); + pthread_create(pids + i, NULL, syncWrite, pThreadInfo); } } @@ -5880,27 +5888,27 @@ static void startMultiThreadInsertData(int threads, char* db_name, double avgDelay = 0; for (int i = 0; i < threads; i++) { - threadInfo *t_info = infos + i; + threadInfo *pThreadInfo = infos + i; - tsem_destroy(&(t_info->lock_sem)); - taos_close(t_info->taos); + tsem_destroy(&(pThreadInfo->lock_sem)); + taos_close(pThreadInfo->taos); debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", __func__, __LINE__, - t_info->threadID, t_info->totalInsertRows, - t_info->totalAffectedRows); + pThreadInfo->threadID, pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); if (superTblInfo) { - superTblInfo->totalAffectedRows += t_info->totalAffectedRows; - superTblInfo->totalInsertRows += t_info->totalInsertRows; + superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows; + superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows; } else { - g_args.totalAffectedRows += t_info->totalAffectedRows; - g_args.totalInsertRows += t_info->totalInsertRows; + g_args.totalAffectedRows += pThreadInfo->totalAffectedRows; + g_args.totalInsertRows += pThreadInfo->totalInsertRows; } - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + totalDelay += pThreadInfo->totalDelay; + cntDelay += pThreadInfo->cntDelay; + if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay; + if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay; } cntDelay -= 1; @@ -5956,26 +5964,26 @@ static void startMultiThreadInsertData(int threads, char* db_name, static void *readTable(void *sarg) { #if 1 - threadInfo *rinfo = (threadInfo *)sarg; - TAOS *taos = rinfo->taos; + threadInfo *pThreadInfo = (threadInfo *)sarg; + TAOS *taos = pThreadInfo->taos; char command[BUFFER_SIZE] = "\0"; - uint64_t sTime = rinfo->start_time; - char *tb_prefix = rinfo->tb_prefix; - FILE *fp = fopen(rinfo->fp, "a"); + uint64_t sTime = pThreadInfo->start_time; + char *tb_prefix = pThreadInfo->tb_prefix; + FILE *fp = fopen(pThreadInfo->filePath, "a"); if (NULL == fp) { - errorPrint( "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); + errorPrint( "fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno)); return NULL; } int64_t num_of_DPT; -/* if (rinfo->superTblInfo) { - num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; +/* if (pThreadInfo->superTblInfo) { + num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table; } else { */ num_of_DPT = g_args.num_of_DPT; // } - int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; + int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int64_t totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -6028,17 +6036,17 @@ static void *readTable(void *sarg) { static void *readMetric(void *sarg) { #if 1 - threadInfo *rinfo = (threadInfo *)sarg; - TAOS *taos = rinfo->taos; + threadInfo *pThreadInfo = (threadInfo *)sarg; + TAOS *taos = pThreadInfo->taos; char command[BUFFER_SIZE] = "\0"; - FILE *fp = fopen(rinfo->fp, "a"); + FILE *fp = fopen(pThreadInfo->filePath, "a"); if (NULL == fp) { - printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); + printf("fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno)); return NULL; } - int64_t num_of_DPT = rinfo->superTblInfo->insertRows; - int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; + int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows; + int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int64_t totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -6237,8 +6245,8 @@ static void *specifiedTableQuery(void *sarg) { uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs(); - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) { + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); } @@ -6338,8 +6346,8 @@ static void *superTableQuery(void *sarg) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); - if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + if (g_queryInfo.superQueryInfo.result[j] != NULL) { + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } @@ -6429,9 +6437,9 @@ static int queryTestProcess() { for (uint64_t i = 0; i < nSqlCount; i++) { for (int j = 0; j < nConcurrent; j++) { uint64_t seq = i * nConcurrent + j; - threadInfo *t_info = infos + seq; - t_info->threadID = seq; - t_info->querySeq = i; + threadInfo *pThreadInfo = infos + seq; + pThreadInfo->threadID = seq; + pThreadInfo->querySeq = i; if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { @@ -6448,10 +6456,10 @@ static int queryTestProcess() { } } - t_info->taos = NULL;// TODO: workaround to use separate taos connection; + pThreadInfo->taos = NULL;// TODO: workaround to use separate taos connection; pthread_create(pids + seq, NULL, specifiedTableQuery, - t_info); + pThreadInfo); } } } else { @@ -6491,15 +6499,15 @@ static int queryTestProcess() { uint64_t startFrom = 0; for (int i = 0; i < threads; i++) { - threadInfo *t_info = infosOfSub + i; - t_info->threadID = i; + threadInfo *pThreadInfo = infosOfSub + i; + pThreadInfo->threadID = i; - t_info->start_table_from = startFrom; - t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = t_info->end_table_to + 1; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pidsOfSub + i, NULL, superTableQuery, t_info); + pThreadInfo->start_table_from = startFrom; + pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; + pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); } g_queryInfo.superQueryInfo.threadCnt = threads; @@ -6546,7 +6554,7 @@ static void stable_sub_callback( } if (param) - appendResultToFile(res, ((threadInfo *)param)->fp); + fetchResult(res, (threadInfo *)param); // tao_unscribe() will free result. } @@ -6559,7 +6567,7 @@ static void specified_sub_callback( } if (param) - appendResultToFile(res, ((threadInfo *)param)->fp); + fetchResult(res, (threadInfo *)param); // tao_unscribe() will free result. } @@ -6613,18 +6621,15 @@ static void *superSubscribe(void *sarg) { } if (pThreadInfo->taos == NULL) { - TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, + pThreadInfo->taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, g_queryInfo.dbName, g_queryInfo.port); - if (taos == NULL) { + if (pThreadInfo->taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->threadID, taos_errstr(NULL)); return NULL; - } else { - pThreadInfo->taos = taos; } } @@ -6654,7 +6659,7 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], subSqlstr, i); if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); } @@ -6697,16 +6702,16 @@ static void *superSubscribe(void *sarg) { if (res) { if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - appendResultToFile(res, pThreadInfo->fp); + fetchResult(res, pThreadInfo); } if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - appendResultToFile(res, pThreadInfo->fp); + fetchResult(res, pThreadInfo); } consumed[tsubSeq] ++; @@ -6747,21 +6752,18 @@ static void *superSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - TAOS_SUB* tsub = NULL; +// TAOS_SUB* tsub = NULL; if (pThreadInfo->taos == NULL) { - TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, + pThreadInfo->taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, g_queryInfo.dbName, g_queryInfo.port); - if (taos == NULL) { + if (pThreadInfo->taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->threadID, taos_errstr(NULL)); return NULL; - } else { - pThreadInfo->taos = taos; } } @@ -6773,69 +6775,70 @@ static void *specifiedSubscribe(void *sarg) { return NULL; } - char topic[32] = {0}; - sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq); - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], + "taosdemo-subscribe-%"PRIu64"-%d", + pThreadInfo->querySeq, + pThreadInfo->threadID); + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) { + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); } - tsub = subscribeImpl( - SPECIFIED_CLASS, pThreadInfo, + g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], - topic, + g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeInterval); - if (NULL == tsub) { + if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { taos_close(pThreadInfo->taos); return NULL; } // start loop to consume result - TAOS_RES* res = NULL; - - int consumed; + g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; while(1) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { continue; } - res = taos_consume(tsub); - if (res) { + g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume( + g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]); + if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) { if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", + sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - appendResultToFile(res, pThreadInfo->fp); + fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo); } - consumed ++; + g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) - && (consumed >= + && (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >= g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { printf("keepProgress:%d, resub specified query: %"PRIu64"\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, pThreadInfo->querySeq); - consumed = 0; - taos_unsubscribe(tsub, + g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; + taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - tsub = subscribeImpl( + g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl( SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], - topic, + g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeInterval); - if (NULL == tsub) { + if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { taos_close(pThreadInfo->taos); return NULL; } } } } - taos_free_result(res); - taos_unsubscribe(tsub, 0); + taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]); + taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->querySeq], 0); taos_close(pThreadInfo->taos); return NULL; @@ -6905,18 +6908,18 @@ static int subscribeTestProcess() { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; - threadInfo *t_info = infos + seq; - t_info->threadID = seq; - t_info->querySeq = i; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pids + seq, NULL, specifiedSubscribe, t_info); + threadInfo *pThreadInfo = infos + seq; + pThreadInfo->threadID = seq; + pThreadInfo->querySeq = i; + pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; + pthread_create(pids + seq, NULL, specifiedSubscribe, pThreadInfo); } } } //==== create threads for super table query if (g_queryInfo.superQueryInfo.sqlCount <= 0) { - printf("%s() LN%d, super table query sqlCount %"PRIu64".\n", + debugPrint("%s() LN%d, super table query sqlCount %"PRIu64".\n", __func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount); } else { @@ -6955,17 +6958,17 @@ static int subscribeTestProcess() { uint64_t startFrom = 0; for (int j = 0; j < threads; j++) { uint64_t seq = i * threads + j; - threadInfo *t_info = infosOfStable + seq; - t_info->threadID = seq; - t_info->querySeq = i; - - t_info->start_table_from = startFrom; - t_info->ntables = jend_table_to = jend_table_to + 1; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; + threadInfo *pThreadInfo = infosOfStable + seq; + pThreadInfo->threadID = seq; + pThreadInfo->querySeq = i; + + pThreadInfo->start_table_from = startFrom; + pThreadInfo->ntables = jend_table_to = jend_table_to + 1; + pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfStable + seq, - NULL, superSubscribe, t_info); + NULL, superSubscribe, pThreadInfo); } } @@ -7244,47 +7247,47 @@ static void queryResult() { // query data pthread_t read_id; - threadInfo *rInfo = malloc(sizeof(threadInfo)); - assert(rInfo); - rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 - rInfo->start_table_from = 0; + threadInfo *pThreadInfo = malloc(sizeof(threadInfo)); + assert(pThreadInfo); + pThreadInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 + pThreadInfo->start_table_from = 0; - //rInfo->do_aggreFunc = g_Dbs.do_aggreFunc; + //pThreadInfo->do_aggreFunc = g_Dbs.do_aggreFunc; if (g_args.use_metric) { - rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount; - rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1; - rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; - tstrncpy(rInfo->tb_prefix, + pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount; + pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1; + pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; + tstrncpy(pThreadInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix, MAX_TB_NAME_SIZE); } else { - rInfo->ntables = g_args.num_of_tables; - rInfo->end_table_to = g_args.num_of_tables -1; - tstrncpy(rInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); + pThreadInfo->ntables = g_args.num_of_tables; + pThreadInfo->end_table_to = g_args.num_of_tables -1; + tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); } - rInfo->taos = taos_connect( + pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, g_Dbs.db[0].dbName, g_Dbs.port); - if (rInfo->taos == NULL) { + if (pThreadInfo->taos == NULL) { errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); - free(rInfo); + free(pThreadInfo); exit(-1); } - tstrncpy(rInfo->fp, g_Dbs.resultFile, MAX_FILE_NAME_LEN); + tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN); if (!g_Dbs.use_metric) { - pthread_create(&read_id, NULL, readTable, rInfo); + pthread_create(&read_id, NULL, readTable, pThreadInfo); } else { - pthread_create(&read_id, NULL, readMetric, rInfo); + pthread_create(&read_id, NULL, readMetric, pThreadInfo); } pthread_join(read_id, NULL); - taos_close(rInfo->taos); - free(rInfo); + taos_close(pThreadInfo->taos); + free(pThreadInfo); } static void testCmdLine() { diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 95507f0142036f36f9a5249a48800bd481513844..1cd65c1dde4e9c2027a82730cf4fc12290048bbe 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -22,7 +22,7 @@ from queue import Queue, Empty from .shared.config import Config from .shared.db import DbTarget, DbConn from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice -from .shared.types import DirPath +from .shared.types import DirPath, IpcStream # from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status # from crash_gen.db import DbConn, DbTarget @@ -177,13 +177,12 @@ quorum 2 return "127.0.0.1" def getServiceCmdLine(self): # to start the instance - cmdLine = [] if Config.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") - cmdLine = ['valgrind', '--leak-check=yes'] - # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control - cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() - return cmdLine + return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] + else: + # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control + return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() def _getDnodes(self, dbc): dbc.query("show dnodes") @@ -281,16 +280,16 @@ class TdeSubProcess: return '[TdeSubProc: pid = {}, status = {}]'.format( self.getPid(), self.getStatus() ) - def getStdOut(self) -> BinaryIO : + def getIpcStdOut(self) -> IpcStream : if self._popen.universal_newlines : # alias of text_mode raise CrashGenError("We need binary mode for STDOUT IPC") # Logging.info("Type of stdout is: {}".format(type(self._popen.stdout))) - return typing.cast(BinaryIO, self._popen.stdout) + return typing.cast(IpcStream, self._popen.stdout) - def getStdErr(self) -> BinaryIO : + def getIpcStdErr(self) -> IpcStream : if self._popen.universal_newlines : # alias of text_mode raise CrashGenError("We need binary mode for STDERR IPC") - return typing.cast(BinaryIO, self._popen.stderr) + return typing.cast(IpcStream, self._popen.stderr) # Now it's always running, since we matched the life cycle # def isRunning(self): @@ -301,11 +300,6 @@ class TdeSubProcess: def _start(self, cmdLine) -> Popen : ON_POSIX = 'posix' in sys.builtin_module_names - - # Sanity check - # if self.subProcess: # already there - # raise RuntimeError("Corrupt process state") - # Prepare environment variables for coverage information # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment @@ -314,9 +308,8 @@ class TdeSubProcess: # print(myEnv) # print("Starting TDengine with env: ", myEnv.items()) - # print("Starting TDengine via Shell: {}".format(cmdLineStr)) + print("Starting TDengine: {}".format(cmdLine)) - # useShell = True # Needed to pass environments into it return Popen( ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, shell=True, # Always use shell, since we need to pass ENV vars @@ -732,19 +725,19 @@ class ServiceManagerThread: self._ipcQueue = Queue() # type: Queue self._thread = threading.Thread( # First thread captures server OUTPUT target=self.svcOutputReader, - args=(subProc.getStdOut(), self._ipcQueue, logDir)) + args=(subProc.getIpcStdOut(), self._ipcQueue, logDir)) self._thread.daemon = True # thread dies with the program self._thread.start() time.sleep(0.01) if not self._thread.is_alive(): # What happened? - Logging.info("Failed to started process to monitor STDOUT") + Logging.info("Failed to start process to monitor STDOUT") self.stop() raise CrashGenError("Failed to start thread to monitor STDOUT") Logging.info("Successfully started process to monitor STDOUT") self._thread2 = threading.Thread( # 2nd thread captures server ERRORs target=self.svcErrorReader, - args=(subProc.getStdErr(), self._ipcQueue, logDir)) + args=(subProc.getIpcStdErr(), self._ipcQueue, logDir)) self._thread2.daemon = True # thread dies with the program self._thread2.start() time.sleep(0.01) @@ -887,14 +880,19 @@ class ServiceManagerThread: print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437'))) return None - def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str + def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str ) -> Generator[TextChunk, None, None]: ''' - Take an input stream with binary data, produced a generator of decoded - "text chunks", and also save the original binary data in a log file. + Take an input stream with binary data (likely from Popen), produced a generator of decoded + "text chunks". + + Side effect: it also save the original binary data in a log file. ''' os.makedirs(logDir, exist_ok=True) logF = open(os.path.join(logDir, logFile), 'wb') + if logF is None: + Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile)) + return for bChunk in iter(streamIn.readline, b''): logF.write(bChunk) # Write to log file immediately tChunk = self._decodeBinaryChunk(bChunk) # decode @@ -902,14 +900,14 @@ class ServiceManagerThread: yield tChunk # TODO: split into actual text lines # At the end... - streamIn.close() # Close the stream - logF.close() # Close the output file + streamIn.close() # Close the incoming stream + logF.close() # Close the log file - def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str): + def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str): ''' The infinite routine that processes the STDOUT stream for the sub process being managed. - :param stdOut: the IO stream object used to fetch the data from + :param ipcStdOut: the IO stream object used to fetch the data from :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data :param logDir: where we should dump a verbatim output file ''' @@ -917,7 +915,7 @@ class ServiceManagerThread: # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # print("This is the svcOutput Reader...") # stdOut.readline() # Skip the first output? TODO: remove? - for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') : + for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') : queue.put(tChunk) # tChunk garanteed not to be None self._printProgress("_i") @@ -940,12 +938,12 @@ class ServiceManagerThread: Logging.info("EOF found TDengine STDOUT, marking the process as terminated") self.setStatus(Status.STATUS_STOPPED) - def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str): + def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str): # os.makedirs(logDir, exist_ok=True) # logFile = os.path.join(logDir,'stderr.log') # fErr = open(logFile, 'wb') # for line in iter(err.readline, b''): - for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') : + for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') : queue.put(tChunk) # tChunk garanteed not to be None # fErr.write(line) Logging.info("TDengine STDERR: {}".format(tChunk)) diff --git a/tests/pytest/crash_gen/shared/types.py b/tests/pytest/crash_gen/shared/types.py index 814a8219178d1c01ee0291447939984bdf81df9d..42fd2a1617cf729e4f23fc61a685027f738bc4a3 100644 --- a/tests/pytest/crash_gen/shared/types.py +++ b/tests/pytest/crash_gen/shared/types.py @@ -1,4 +1,4 @@ -from typing import Any, List, Dict, NewType +from typing import Any, BinaryIO, List, Dict, NewType from enum import Enum DirPath = NewType('DirPath', str) @@ -26,3 +26,5 @@ class TdDataType(Enum): TdColumns = Dict[str, TdDataType] TdTags = Dict[str, TdDataType] + +IpcStream = NewType('IpcStream', BinaryIO) \ No newline at end of file