未验证 提交 21c0341b 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 4296 taosdemo sub keepprogress for develop (#6222)

* [TD-4296]<fix>: taosdemo sub keepProgress.

* make print sqlcount quiet.

* [TD-4296]<fix>: taosdemo subscribe keepprogress.

specified query topic thread safe.

* cherry pick from master branch.

* cherry pick from master branch.

* fix compile warning for mac.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 d1f15a82
......@@ -375,7 +375,7 @@ typedef struct SDbs_S {
typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint64_t concurrent;
uint32_t concurrent;
uint64_t sqlCount;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
......@@ -386,6 +386,9 @@ typedef struct SpecifiedQueryInfo_S {
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT];
TAOS_RES* res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
} SpecifiedQueryInfo;
......@@ -431,7 +434,8 @@ typedef struct SThreadInfo_S {
int threadID;
char db_name[MAX_DB_NAME_SIZE+1];
uint32_t time_precision;
char fp[4096];
char filePath[4096];
FILE *fp;
char tb_prefix[MAX_TB_NAME_SIZE];
uint64_t start_table_from;
uint64_t end_table_to;
......@@ -546,7 +550,7 @@ static int createDatabasesAndStables();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
uint16_t port, char* sqlstr, char *resultFile);
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
/* ************ Global variables ************ */
......@@ -1128,7 +1132,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
if (code != 0) {
if (!quiet) {
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res));
errorPrint("Failed to execute %s, command length: %d, reason: %s\n",
command, (int)strlen(command), taos_errstr(res));
}
taos_free_result(res);
//taos_close(taos);
......@@ -1145,22 +1150,22 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
return 0;
}
static void appendResultBufToFile(char *resultBuf, char *resultFile)
static void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo)
{
FILE *fp = NULL;
fp = fopen(resultFile, "at");
if (fp == NULL) {
pThreadInfo->fp = fopen(pThreadInfo->filePath, "at");
if (pThreadInfo->fp == NULL) {
errorPrint(
"%s() LN%d, failed to open result file: %s, result will not save to file\n",
__func__, __LINE__, resultFile);
__func__, __LINE__, pThreadInfo->filePath);
return;
}
fprintf(fp, "%s", resultBuf);
tmfclose(fp);
fprintf(pThreadInfo->fp, "%s", resultBuf);
tmfclose(pThreadInfo->fp);
pThreadInfo->fp = NULL;
}
static void fetchResult(TAOS_RES *res, char* resultFile) {
static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
TAOS_ROW row = NULL;
int num_rows = 0;
int num_fields = taos_field_count(res);
......@@ -1178,9 +1183,9 @@ static void fetchResult(TAOS_RES *res, char* resultFile) {
// fetch the records row by row
while((row = taos_fetch_row(res))) {
if ((resultFile) && (strlen(resultFile) > 0)
if ((strlen(pThreadInfo->filePath) > 0)
&& (totalLen >= 100*1024*1024 - 32000)) {
appendResultBufToFile(databuf, resultFile);
appendResultBufToFile(databuf, pThreadInfo);
totalLen = 0;
memset(databuf, 0, 100*1024*1024);
}
......@@ -1193,9 +1198,9 @@ static void fetchResult(TAOS_RES *res, char* resultFile) {
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
__func__, __LINE__, databuf, resultFile);
if ((resultFile) && (strlen(resultFile) > 0)) {
appendResultBufToFile(databuf, resultFile);
__func__, __LINE__, databuf, pThreadInfo->filePath);
if (strlen(pThreadInfo->filePath) > 0) {
appendResultBufToFile(databuf, pThreadInfo);
}
free(databuf);
}
......@@ -1212,14 +1217,14 @@ static void selectAndGetResult(
return;
}
fetchResult(res, pThreadInfo->fp);
fetchResult(res, pThreadInfo);
taos_free_result(res);
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
command,
pThreadInfo->fp);
pThreadInfo);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
}
......@@ -1756,7 +1761,7 @@ static void printfQueryMeta() {
printf("query interval: \033[33m%"PRIu64" ms\033[0m\n",
g_queryInfo.specifiedQueryInfo.queryInterval);
printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%"PRIu64"\033[0m\n",
printf("concurrent: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.concurrent);
printf("mod: \033[33m%s\033[0m\n",
(g_queryInfo.specifiedQueryInfo.asyncMode)?"async":"sync");
......@@ -2095,7 +2100,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
}
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
char* sqlstr, char *resultFile)
char* sqlstr, threadInfo *pThreadInfo)
{
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
......@@ -2231,8 +2236,8 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
response_buf[RESP_BUF_LEN - 1] = '\0';
printf("Response:\n%s\n", response_buf);
if ((resultFile) && (strlen(resultFile) > 0)) {
appendResultBufToFile(response_buf, resultFile);
if (strlen(pThreadInfo->filePath) > 0) {
appendResultBufToFile(response_buf, pThreadInfo);
}
free(request_buf);
......@@ -2994,19 +2999,19 @@ static int startMultiThreadCreateChildTable(
int64_t b = 0;
b = ntables % threads;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
t_info->superTblInfo = superTblInfo;
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE);
pThreadInfo->superTblInfo = superTblInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
t_info->taos = taos_connect(
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
db_name,
g_Dbs.port);
if (t_info->taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint( "%s() LN%d, Failed to connect to TDengine, reason:%s\n",
__func__, __LINE__, taos_errstr(NULL));
free(pids);
......@@ -3014,14 +3019,14 @@ static int startMultiThreadCreateChildTable(
return -1;
}
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->use_metric = true;
t_info->cols = cols;
t_info->minDelay = UINT64_MAX;
pthread_create(pids + i, NULL, createTable, t_info);
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->use_metric = true;
pThreadInfo->cols = cols;
pThreadInfo->minDelay = UINT64_MAX;
pthread_create(pids + i, NULL, createTable, pThreadInfo);
}
for (int i = 0; i < threads; i++) {
......@@ -3029,8 +3034,8 @@ static int startMultiThreadCreateChildTable(
}
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
taos_close(t_info->taos);
threadInfo *pThreadInfo = infos + i;
taos_close(pThreadInfo->taos);
}
free(pids);
......@@ -4246,7 +4251,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) {
errorPrint(
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
......@@ -4313,24 +4318,28 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
// sqls
cJSON* superSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!superSqls) {
cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!specifiedSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
} else if (specifiedSqls->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
int superSqlSize = cJSON_GetArraySize(specifiedSqls);
if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent
> MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n",
__func__, __LINE__,
superSqlSize,
g_queryInfo.specifiedQueryInfo.concurrent,
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
......@@ -4506,16 +4515,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
}
// sqls
cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!subsqls) {
// supert table sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) {
} else if (superSqls->type != cJSON_Array) {
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(subsqls);
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
......@@ -4524,7 +4533,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(subsqls, j);
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
......@@ -5970,22 +5979,22 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
t_info->time_precision = timePrec;
t_info->superTblInfo = superTblInfo;
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
t_info->start_time = start_time;
t_info->minDelay = UINT64_MAX;
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(superTblInfo->insertMode != REST_IFACE)) {
//t_info->taos = taos;
t_info->taos = taos_connect(
//pThreadInfo->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
if (NULL == pThreadInfo->taos) {
errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
......@@ -5995,8 +6004,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
if ((superTblInfo) && (superTblInfo->insertMode == STMT_IFACE)) {
t_info->stmt = taos_stmt_init(t_info->taos);
if (NULL == t_info->stmt) {
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
......@@ -6007,27 +6016,27 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
}
} else {
t_info->taos = NULL;
pThreadInfo->taos = NULL;
}
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
/* } else {
t_info->start_table_from = 0;
t_info->ntables = superTblInfo->childTblCount;
t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint();
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
tsem_init(&(t_info->lock_sem), 0, 0);
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, t_info);
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWrite, t_info);
pthread_create(pids + i, NULL, syncWrite, pThreadInfo);
}
}
......@@ -6042,31 +6051,32 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double avgDelay = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(t_info->lock_sem));
tsem_destroy(&(pThreadInfo->lock_sem));
if (t_info->stmt) {
taos_stmt_close(t_info->stmt);
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
}
taos_close(t_info->taos);
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows);
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalInsertRows += t_info->totalInsertRows;
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += t_info->totalAffectedRows;
g_args.totalInsertRows += t_info->totalInsertRows;
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
}
totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
totalDelay += pThreadInfo->totalDelay;
cntDelay += pThreadInfo->cntDelay;
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay;
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
cntDelay -= 1;
......@@ -6122,26 +6132,26 @@ static void startMultiThreadInsertData(int threads, char* db_name,
static void *readTable(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
char command[BUFFER_SIZE] = "\0";
uint64_t sTime = rinfo->start_time;
char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a");
uint64_t sTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix;
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
errorPrint( "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
errorPrint( "fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
int64_t num_of_DPT;
/* if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
/* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
// }
int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -6194,17 +6204,17 @@ static void *readTable(void *sarg) {
static void *readMetric(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(rinfo->fp, "a");
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
printf("fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
int64_t num_of_DPT = rinfo->superTblInfo->insertRows;
int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -6403,8 +6413,8 @@ static void *specifiedTableQuery(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
......@@ -6504,8 +6514,8 @@ static void *superTableQuery(void *sarg) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
if (g_queryInfo.superQueryInfo.result[j] != NULL) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
}
......@@ -6595,9 +6605,9 @@ static int queryTestProcess() {
for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
......@@ -6614,10 +6624,10 @@ static int queryTestProcess() {
}
}
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
t_info);
pThreadInfo);
}
}
} else {
......@@ -6657,15 +6667,15 @@ static int queryTestProcess() {
uint64_t startFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, t_info);
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
g_queryInfo.superQueryInfo.threadCnt = threads;
......@@ -6712,7 +6722,7 @@ static void stable_sub_callback(
}
if (param)
fetchResult(res, ((threadInfo *)param)->fp);
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
}
......@@ -6725,7 +6735,7 @@ static void specified_sub_callback(
}
if (param)
fetchResult(res, ((threadInfo *)param)->fp);
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
}
......@@ -6779,18 +6789,15 @@ static void *superSubscribe(void *sarg) {
}
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
......@@ -6820,7 +6827,7 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
subSqlstr, i);
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
......@@ -6863,16 +6870,16 @@ static void *superSubscribe(void *sarg) {
if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(res, pThreadInfo->fp);
fetchResult(res, pThreadInfo);
}
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(res, pThreadInfo->fp);
fetchResult(res, pThreadInfo);
}
consumed[tsubSeq] ++;
......@@ -6913,21 +6920,18 @@ static void *superSubscribe(void *sarg) {
static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB* tsub = NULL;
// TAOS_SUB* tsub = NULL;
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
......@@ -6939,69 +6943,70 @@ static void *specifiedSubscribe(void *sarg) {
return NULL;
}
char topic[32] = {0};
sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
"taosdemo-subscribe-%"PRIu64"-%d",
pThreadInfo->querySeq,
pThreadInfo->threadID);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
tsub = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) {
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
// start loop to consume result
TAOS_RES* res = NULL;
int consumed;
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while(1) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
res = taos_consume(tsub);
if (res) {
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(res, pThreadInfo->fp);
fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo);
}
consumed ++;
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed >=
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
consumed = 0;
taos_unsubscribe(tsub,
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub = subscribeImpl(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) {
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
taos_free_result(res);
taos_unsubscribe(tsub, 0);
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->querySeq], 0);
taos_close(pThreadInfo->taos);
return NULL;
......@@ -7071,11 +7076,11 @@ static int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, t_info);
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, pThreadInfo);
}
}
}
......@@ -7121,17 +7126,17 @@ static int subscribeTestProcess() {
uint64_t startFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *t_info = infosOfStable + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->start_table_from = startFrom;
t_info->ntables = j<b?a+1:a;
t_info->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, t_info);
NULL, superSubscribe, pThreadInfo);
}
}
......@@ -7410,47 +7415,47 @@ static void queryResult() {
// query data
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
assert(rInfo);
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_from = 0;
threadInfo *pThreadInfo = malloc(sizeof(threadInfo));
assert(pThreadInfo);
pThreadInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
pThreadInfo->start_table_from = 0;
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
//pThreadInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(rInfo->tb_prefix,
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, MAX_TB_NAME_SIZE);
} else {
rInfo->ntables = g_args.num_of_tables;
rInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(rInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
pThreadInfo->ntables = g_args.num_of_tables;
pThreadInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
}
rInfo->taos = taos_connect(
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
if (rInfo->taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(rInfo);
free(pThreadInfo);
exit(-1);
}
tstrncpy(rInfo->fp, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, rInfo);
pthread_create(&read_id, NULL, readTable, pThreadInfo);
} else {
pthread_create(&read_id, NULL, readMetric, rInfo);
pthread_create(&read_id, NULL, readMetric, pThreadInfo);
}
pthread_join(read_id, NULL);
taos_close(rInfo->taos);
free(rInfo);
taos_close(pThreadInfo->taos);
free(pThreadInfo);
}
static void testCmdLine() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册