From c89ada70fd3869b8abb71e2692135626699aa3e7 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 26 Mar 2021 19:29:17 +0800 Subject: [PATCH] [TD-3524] : taosdemo query multithreading workaround. (#5587) Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 134 ++++++++++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 29 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 1048d7b969..13b4d14f52 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -5418,6 +5418,22 @@ static int insertTestProcess() { static void *superQueryProcess(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; + if (winfo->taos == NULL) { + TAOS * taos = NULL; + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + NULL, + g_queryInfo.port); + if (taos == NULL) { + errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", + winfo->threadID, taos_errstr(NULL)); + return NULL; + } else { + winfo->taos = taos; + } + } + //char sqlStr[MAX_TB_NAME_SIZE*2]; //sprintf(sqlStr, "use %s", g_queryInfo.dbName); //queryDB(winfo->taos, sqlStr); @@ -5493,6 +5509,23 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { static void *subQueryProcess(void *sarg) { char sqlstr[1024]; threadInfo *winfo = (threadInfo *)sarg; + + if (winfo->taos == NULL) { + TAOS * taos = NULL; + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + NULL, + g_queryInfo.port); + if (taos == NULL) { + errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", + winfo->threadID, taos_errstr(NULL)); + return NULL; + } else { + winfo->taos = taos; + } + } + int64_t st = 0; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; int queryTimes = g_args.query_times; @@ -5534,12 +5567,12 @@ static int queryTestProcess() { setupForAnsiEscape(); printfQueryMeta(); resetAfterAnsiEscape(); - - TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, - g_queryInfo.user, - g_queryInfo.password, - NULL, + + TAOS * taos = NULL; + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + NULL, g_queryInfo.port); if (taos == NULL) { errorPrint( "Failed to connect to TDengine, reason:%s\n", @@ -5554,14 +5587,14 @@ static int queryTestProcess() { &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); } - + if (!g_args.answer_yes) { printf("Press enter key to continue\n\n"); (void)getchar(); } printfQuerySystemInfo(taos); - + pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from specify table @@ -5585,28 +5618,30 @@ static int queryTestProcess() { t_info->threadID = i; if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { - t_info->taos = taos; char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); - if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE, false)) { + if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(taos); free(infos); free(pids); errorPrint( "use database %s failed!\n\n", g_queryInfo.dbName); return -1; } - } else { - t_info->taos = NULL; } + t_info->taos = NULL;// TODO: workaround to use separate taos connection; + pthread_create(pids + i, NULL, superQueryProcess, t_info); } - }else { + } else { g_queryInfo.superQueryInfo.concurrent = 0; } + taos_close(taos); + pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; //==== create sub threads for query from all sub table of the super table @@ -5614,7 +5649,6 @@ static int queryTestProcess() { && (g_queryInfo.subQueryInfo.threadCnt > 0)) { pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); if (NULL == pidsOfSub) { - taos_close(taos); free(infos); free(pids); @@ -5623,7 +5657,6 @@ static int queryTestProcess() { infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); if (NULL == infosOfSub) { - taos_close(taos); free(pidsOfSub); free(infos); free(pids); @@ -5653,7 +5686,7 @@ static int queryTestProcess() { t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; startFrom = t_info->end_table_to + 1; - t_info->taos = taos; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); } @@ -5676,7 +5709,7 @@ static int queryTestProcess() { tmfree((char*)pidsOfSub); tmfree((char*)infosOfSub); - taos_close(taos); +// taos_close(taos);// TODO: workaround to use separate taos connection; return 0; } @@ -5717,10 +5750,27 @@ static void *subSubscribeProcess(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; char subSqlstr[1024]; + if (winfo->taos == NULL) { + TAOS * taos = NULL; + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + g_queryInfo.dbName, + g_queryInfo.port); + if (taos == NULL) { + errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", + winfo->threadID, taos_errstr(NULL)); + return NULL; + } else { + winfo->taos = taos; + } + } + char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); - if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)){ + if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(winfo->taos); return NULL; } @@ -5742,8 +5792,10 @@ static void *subSubscribeProcess(void *sarg) { if (g_queryInfo.subQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID); } - g_queryInfo.subQueryInfo.tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile); + g_queryInfo.subQueryInfo.tsub[i] = subscribeImpl( + winfo->taos, subSqlstr, topic, tmpFile); if (NULL == g_queryInfo.subQueryInfo.tsub[i]) { + taos_close(winfo->taos); return NULL; } } @@ -5777,16 +5829,35 @@ static void *subSubscribeProcess(void *sarg) { taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress); } + + taos_close(winfo->taos); return NULL; } static void *superSubscribeProcess(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; + if (winfo->taos == NULL) { + TAOS * taos = NULL; + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + g_queryInfo.dbName, + g_queryInfo.port); + if (taos == NULL) { + errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", + winfo->threadID, taos_errstr(NULL)); + return NULL; + } else { + winfo->taos = taos; + } + } + char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(winfo->taos); return NULL; } @@ -5812,6 +5883,7 @@ static void *superSubscribeProcess(void *sarg) { g_queryInfo.superQueryInfo.sql[i], topic, tmpFile); if (NULL == g_queryInfo.superQueryInfo.tsub[i]) { + taos_close(winfo->taos); return NULL; } } @@ -5844,6 +5916,8 @@ static void *superSubscribeProcess(void *sarg) { taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); } + + taos_close(winfo->taos); return NULL; } @@ -5851,10 +5925,10 @@ static int subscribeTestProcess() { setupForAnsiEscape(); printfQueryMeta(); resetAfterAnsiEscape(); - + if (!g_args.answer_yes) { printf("Press enter key to continue\n\n"); - (void)getchar(); + (void) getchar(); } TAOS * taos = NULL; @@ -5877,6 +5951,8 @@ static int subscribeTestProcess() { &g_queryInfo.subQueryInfo.childTblCount); } + taos_close(taos); // TODO: workaround to use separate taos connection; + pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from super table @@ -5892,18 +5968,17 @@ static int subscribeTestProcess() { infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); - taos_close(taos); exit(-1); } for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; - t_info->taos = taos; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pids + i, NULL, superSubscribeProcess, t_info); } - //==== create sub threads for query from sub table + //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; if ((g_queryInfo.subQueryInfo.sqlCount > 0) @@ -5913,8 +5988,9 @@ static int subscribeTestProcess() { infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { - printf("malloc failed for create threads\n"); - taos_close(taos); + errorPrint("%s() LN%d, malloc failed for create threads\n", + __func__, __LINE__); + // taos_close(taos); exit(-1); } @@ -5941,7 +6017,7 @@ static int subscribeTestProcess() { t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; startFrom = t_info->end_table_to + 1; - t_info->taos = taos; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info); } g_queryInfo.subQueryInfo.threadCnt = threads; @@ -5949,7 +6025,7 @@ static int subscribeTestProcess() { for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); - } + } tmfree((char*)pids); tmfree((char*)infos); @@ -5960,7 +6036,7 @@ static int subscribeTestProcess() { tmfree((char*)pidsOfSub); tmfree((char*)infosOfSub); - taos_close(taos); +// taos_close(taos); return 0; } -- GitLab