未验证 提交 c89ada70 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-3524] <fix>: taosdemo query multithreading workaround. (#5587)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 ccbb062c
...@@ -5418,6 +5418,22 @@ static int insertTestProcess() { ...@@ -5418,6 +5418,22 @@ static int insertTestProcess() {
static void *superQueryProcess(void *sarg) { static void *superQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
if (winfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
winfo->threadID, taos_errstr(NULL));
return NULL;
} else {
winfo->taos = taos;
}
}
//char sqlStr[MAX_TB_NAME_SIZE*2]; //char sqlStr[MAX_TB_NAME_SIZE*2];
//sprintf(sqlStr, "use %s", g_queryInfo.dbName); //sprintf(sqlStr, "use %s", g_queryInfo.dbName);
//queryDB(winfo->taos, sqlStr); //queryDB(winfo->taos, sqlStr);
...@@ -5493,6 +5509,23 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { ...@@ -5493,6 +5509,23 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
static void *subQueryProcess(void *sarg) { static void *subQueryProcess(void *sarg) {
char sqlstr[1024]; char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
if (winfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
winfo->threadID, taos_errstr(NULL));
return NULL;
} else {
winfo->taos = taos;
}
}
int64_t st = 0; int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000;
int queryTimes = g_args.query_times; int queryTimes = g_args.query_times;
...@@ -5534,12 +5567,12 @@ static int queryTestProcess() { ...@@ -5534,12 +5567,12 @@ static int queryTestProcess() {
setupForAnsiEscape(); setupForAnsiEscape();
printfQueryMeta(); printfQueryMeta();
resetAfterAnsiEscape(); resetAfterAnsiEscape();
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
g_queryInfo.user, g_queryInfo.user,
g_queryInfo.password, g_queryInfo.password,
NULL, NULL,
g_queryInfo.port); g_queryInfo.port);
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", errorPrint( "Failed to connect to TDengine, reason:%s\n",
...@@ -5554,14 +5587,14 @@ static int queryTestProcess() { ...@@ -5554,14 +5587,14 @@ static int queryTestProcess() {
&g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount); &g_queryInfo.subQueryInfo.childTblCount);
} }
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n"); printf("Press enter key to continue\n\n");
(void)getchar(); (void)getchar();
} }
printfQuerySystemInfo(taos); printfQuerySystemInfo(taos);
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from specify table //==== create sub threads for query from specify table
...@@ -5585,28 +5618,30 @@ static int queryTestProcess() { ...@@ -5585,28 +5618,30 @@ static int queryTestProcess() {
t_info->threadID = i; t_info->threadID = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
t_info->taos = taos;
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos); free(infos);
free(pids); free(pids);
errorPrint( "use database %s failed!\n\n", errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName); g_queryInfo.dbName);
return -1; return -1;
} }
} else {
t_info->taos = NULL;
} }
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, superQueryProcess, t_info); pthread_create(pids + i, NULL, superQueryProcess, t_info);
} }
}else { } else {
g_queryInfo.superQueryInfo.concurrent = 0; g_queryInfo.superQueryInfo.concurrent = 0;
} }
taos_close(taos);
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table //==== create sub threads for query from all sub table of the super table
...@@ -5614,7 +5649,6 @@ static int queryTestProcess() { ...@@ -5614,7 +5649,6 @@ static int queryTestProcess() {
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) { && (g_queryInfo.subQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t));
if (NULL == pidsOfSub) { if (NULL == pidsOfSub) {
taos_close(taos);
free(infos); free(infos);
free(pids); free(pids);
...@@ -5623,7 +5657,6 @@ static int queryTestProcess() { ...@@ -5623,7 +5657,6 @@ static int queryTestProcess() {
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo));
if (NULL == infosOfSub) { if (NULL == infosOfSub) {
taos_close(taos);
free(pidsOfSub); free(pidsOfSub);
free(infos); free(infos);
free(pids); free(pids);
...@@ -5653,7 +5686,7 @@ static int queryTestProcess() { ...@@ -5653,7 +5686,7 @@ static int queryTestProcess() {
t_info->ntables = i<b?a+1:a; t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1; startFrom = t_info->end_table_to + 1;
t_info->taos = taos; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info);
} }
...@@ -5676,7 +5709,7 @@ static int queryTestProcess() { ...@@ -5676,7 +5709,7 @@ static int queryTestProcess() {
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub); tmfree((char*)infosOfSub);
taos_close(taos); // taos_close(taos);// TODO: workaround to use separate taos connection;
return 0; return 0;
} }
...@@ -5717,10 +5750,27 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5717,10 +5750,27 @@ static void *subSubscribeProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
char subSqlstr[1024]; char subSqlstr[1024];
if (winfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
winfo->threadID, taos_errstr(NULL));
return NULL;
} else {
winfo->taos = taos;
}
}
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)){ if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(winfo->taos);
return NULL; return NULL;
} }
...@@ -5742,8 +5792,10 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5742,8 +5792,10 @@ static void *subSubscribeProcess(void *sarg) {
if (g_queryInfo.subQueryInfo.result[i][0] != 0) { if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID); sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID);
} }
g_queryInfo.subQueryInfo.tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile); g_queryInfo.subQueryInfo.tsub[i] = subscribeImpl(
winfo->taos, subSqlstr, topic, tmpFile);
if (NULL == g_queryInfo.subQueryInfo.tsub[i]) { if (NULL == g_queryInfo.subQueryInfo.tsub[i]) {
taos_close(winfo->taos);
return NULL; return NULL;
} }
} }
...@@ -5777,16 +5829,35 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5777,16 +5829,35 @@ static void *subSubscribeProcess(void *sarg) {
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i],
g_queryInfo.subQueryInfo.subscribeKeepProgress); g_queryInfo.subQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos);
return NULL; return NULL;
} }
static void *superSubscribeProcess(void *sarg) { static void *superSubscribeProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
if (winfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
winfo->threadID, taos_errstr(NULL));
return NULL;
} else {
winfo->taos = taos;
}
}
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(winfo->taos);
return NULL; return NULL;
} }
...@@ -5812,6 +5883,7 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -5812,6 +5883,7 @@ static void *superSubscribeProcess(void *sarg) {
g_queryInfo.superQueryInfo.sql[i], g_queryInfo.superQueryInfo.sql[i],
topic, tmpFile); topic, tmpFile);
if (NULL == g_queryInfo.superQueryInfo.tsub[i]) { if (NULL == g_queryInfo.superQueryInfo.tsub[i]) {
taos_close(winfo->taos);
return NULL; return NULL;
} }
} }
...@@ -5844,6 +5916,8 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -5844,6 +5916,8 @@ static void *superSubscribeProcess(void *sarg) {
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos);
return NULL; return NULL;
} }
...@@ -5851,10 +5925,10 @@ static int subscribeTestProcess() { ...@@ -5851,10 +5925,10 @@ static int subscribeTestProcess() {
setupForAnsiEscape(); setupForAnsiEscape();
printfQueryMeta(); printfQueryMeta();
resetAfterAnsiEscape(); resetAfterAnsiEscape();
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n"); printf("Press enter key to continue\n\n");
(void)getchar(); (void) getchar();
} }
TAOS * taos = NULL; TAOS * taos = NULL;
...@@ -5877,6 +5951,8 @@ static int subscribeTestProcess() { ...@@ -5877,6 +5951,8 @@ static int subscribeTestProcess() {
&g_queryInfo.subQueryInfo.childTblCount); &g_queryInfo.subQueryInfo.childTblCount);
} }
taos_close(taos); // TODO: workaround to use separate taos connection;
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from super table //==== create sub threads for query from super table
...@@ -5892,18 +5968,17 @@ static int subscribeTestProcess() { ...@@ -5892,18 +5968,17 @@ static int subscribeTestProcess() {
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
taos_close(taos);
exit(-1); exit(-1);
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
t_info->taos = taos; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, superSubscribeProcess, t_info); pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
} }
//==== create sub threads for query from sub table //==== create sub threads for query from sub table
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
if ((g_queryInfo.subQueryInfo.sqlCount > 0) if ((g_queryInfo.subQueryInfo.sqlCount > 0)
...@@ -5913,8 +5988,9 @@ static int subscribeTestProcess() { ...@@ -5913,8 +5988,9 @@ static int subscribeTestProcess() {
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt *
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
printf("malloc failed for create threads\n"); errorPrint("%s() LN%d, malloc failed for create threads\n",
taos_close(taos); __func__, __LINE__);
// taos_close(taos);
exit(-1); exit(-1);
} }
...@@ -5941,7 +6017,7 @@ static int subscribeTestProcess() { ...@@ -5941,7 +6017,7 @@ static int subscribeTestProcess() {
t_info->ntables = i<b?a+1:a; t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1; startFrom = t_info->end_table_to + 1;
t_info->taos = taos; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info); pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info);
} }
g_queryInfo.subQueryInfo.threadCnt = threads; g_queryInfo.subQueryInfo.threadCnt = threads;
...@@ -5949,7 +6025,7 @@ static int subscribeTestProcess() { ...@@ -5949,7 +6025,7 @@ static int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
tmfree((char*)pids); tmfree((char*)pids);
tmfree((char*)infos); tmfree((char*)infos);
...@@ -5960,7 +6036,7 @@ static int subscribeTestProcess() { ...@@ -5960,7 +6036,7 @@ static int subscribeTestProcess() {
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub); tmfree((char*)infosOfSub);
taos_close(taos); // taos_close(taos);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册