未验证 提交 3d7650d1 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-3902]<fix>: taosdemo subscribe. (#6028)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 8858087e
...@@ -1630,64 +1630,68 @@ static void printfQueryMeta() { ...@@ -1630,64 +1630,68 @@ static void printfQueryMeta() {
printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName); printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName);
printf("\n"); printf("\n");
printf("specified table query info: \n");
printf("query interval: \033[33m%"PRIu64" ms\033[0m\n", if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) {
g_queryInfo.specifiedQueryInfo.queryInterval); printf("specified table query info: \n");
printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times); printf("sqlCount: \033[33m%"PRIu64"\033[0m\n",
printf("concurrent: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.concurrent);
printf("sqlCount: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
printf("specified tbl query times:\n"); if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) {
printf(" \033[33m%"PRIu64"\033[0m\n", printf("specified tbl query times:\n");
printf(" \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.queryTimes); g_queryInfo.specifiedQueryInfo.queryTimes);
printf("query interval: \033[33m%"PRIu64" ms\033[0m\n",
if (SUBSCRIBE_TEST == g_args.test_mode) { g_queryInfo.specifiedQueryInfo.queryInterval);
printf("mod: \033[33m%d\033[0m\n", printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times);
g_queryInfo.specifiedQueryInfo.mode); printf("concurrent: \033[33m%"PRIu64"\033[0m\n",
printf("interval: \033[33m%"PRIu64"\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent);
printf("mod: \033[33m%s\033[0m\n",
(g_queryInfo.specifiedQueryInfo.mode)?"async":"sync");
printf("interval: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeRestart); g_queryInfo.specifiedQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n", printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
}
for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n", printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n",
i, g_queryInfo.specifiedQueryInfo.sql[i]); i, g_queryInfo.specifiedQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
printf("super table query info:\n"); }
printf("query interval: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.queryInterval); printf("super table query info:\n");
printf("threadCnt: \033[33m%d\033[0m\n", printf("sqlCount: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.threadCnt); g_queryInfo.superQueryInfo.sqlCount);
printf("childTblCount: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.childTblCount); if (g_queryInfo.superQueryInfo.sqlCount > 0) {
printf("stable name: \033[33m%s\033[0m\n", printf("query interval: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.sTblName); g_queryInfo.superQueryInfo.queryInterval);
printf("stb query times:\033[33m%"PRIu64"\033[0m\n", printf("threadCnt: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.queryTimes); g_queryInfo.superQueryInfo.threadCnt);
printf("childTblCount: \033[33m%"PRIu64"\033[0m\n",
if (SUBSCRIBE_TEST == g_args.test_mode) { g_queryInfo.superQueryInfo.childTblCount);
printf("mod: \033[33m%d\033[0m\n", printf("stable name: \033[33m%s\033[0m\n",
g_queryInfo.superQueryInfo.mode); g_queryInfo.superQueryInfo.sTblName);
printf("interval: \033[33m%"PRIu64"\033[0m\n", printf("stb query times:\033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.queryTimes);
printf("mod: \033[33m%s\033[0m\n",
(g_queryInfo.superQueryInfo.mode)?"async":"sync");
printf("interval: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeRestart); g_queryInfo.superQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n", printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
}
printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
g_queryInfo.superQueryInfo.sqlCount); printf(" sql[%d]: \033[33m%s\033[0m\n",
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { i, g_queryInfo.superQueryInfo.sql[i]);
printf(" sql[%d]: \033[33m%s\033[0m\n", }
i, g_queryInfo.superQueryInfo.sql[i]); printf("\n");
}
} }
printf("\n");
SHOW_PARSE_RESULT_END(); SHOW_PARSE_RESULT_END();
} }
...@@ -2847,7 +2851,7 @@ static void* createTable(void *sarg) ...@@ -2847,7 +2851,7 @@ static void* createTable(void *sarg)
} }
static int startMultiThreadCreateChildTable( static int startMultiThreadCreateChildTable(
char* cols, int threads, int64_t startFrom, int64_t ntables, char* cols, int threads, uint64_t startFrom, uint64_t ntables,
char* db_name, SSuperTable* superTblInfo) { char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
...@@ -2862,13 +2866,13 @@ static int startMultiThreadCreateChildTable( ...@@ -2862,13 +2866,13 @@ static int startMultiThreadCreateChildTable(
threads = 1; threads = 1;
} }
int64_t a = ntables / threads; uint64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
threads = ntables; threads = ntables;
a = 1; a = 1;
} }
int64_t b = 0; uint64_t b = 0;
b = ntables % threads; b = ntables % threads;
for (int64_t i = 0; i < threads; i++) { for (int64_t i = 0; i < threads; i++) {
...@@ -4212,7 +4216,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4212,7 +4216,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
} }
// sub_table_query // super_table_query
cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query");
if (!superQuery) { if (!superQuery) {
g_queryInfo.superQueryInfo.threadCnt = 1; g_queryInfo.superQueryInfo.threadCnt = 1;
...@@ -5679,13 +5683,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5679,13 +5683,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
taos_close(taos); taos_close(taos);
int a = ntables / threads; uint64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
threads = ntables; threads = ntables;
a = 1; a = 1;
} }
int b = 0; uint64_t b = 0;
if (threads != 0) { if (threads != 0) {
b = ntables % threads; b = ntables % threads;
} }
...@@ -6380,7 +6384,7 @@ static int queryTestProcess() { ...@@ -6380,7 +6384,7 @@ static int queryTestProcess() {
b = ntables % threads; b = ntables % threads;
} }
int startFrom = 0; uint64_t startFrom = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i; threadInfo *t_info = infosOfSub + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -6436,13 +6440,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c ...@@ -6436,13 +6440,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
} }
getResult(res, (char*)param); getResult(res, (char*)param);
taos_free_result(res); // tao_unscribe() will free result.
} }
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) { static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (g_queryInfo.specifiedQueryInfo.mode) { if (ASYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
...@@ -6466,6 +6471,9 @@ static void *superSubscribe(void *sarg) { ...@@ -6466,6 +6471,9 @@ static void *superSubscribe(void *sarg) {
char subSqlstr[1024]; char subSqlstr[1024];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.superQueryInfo.sqlCount == 0)
return NULL;
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
...@@ -6524,7 +6532,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6524,7 +6532,7 @@ static void *superSubscribe(void *sarg) {
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.superQueryInfo.mode) { if (ASYNC_QUERY_MODE == g_queryInfo.superQueryInfo.mode) {
continue; continue;
} }
...@@ -6554,6 +6562,9 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6554,6 +6562,9 @@ static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.specifiedQueryInfo.sqlCount == 0)
return NULL;
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
...@@ -6591,7 +6602,7 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6591,7 +6602,7 @@ static void *specifiedSubscribe(void *sarg) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
sprintf(topic, "taosdemo-subscribe-%d", i); sprintf(topic, "taosdemo-subscribe-%d", i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
} }
...@@ -6610,7 +6621,7 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6610,7 +6621,7 @@ static void *specifiedSubscribe(void *sarg) {
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (SYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) { if (ASYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) {
continue; continue;
} }
...@@ -6710,21 +6721,21 @@ static int subscribeTestProcess() { ...@@ -6710,21 +6721,21 @@ static int subscribeTestProcess() {
exit(-1); exit(-1);
} }
int ntables = g_queryInfo.superQueryInfo.childTblCount; uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt; int threads = g_queryInfo.superQueryInfo.threadCnt;
int a = ntables / threads; uint64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
threads = ntables; threads = ntables;
a = 1; a = 1;
} }
int b = 0; uint64_t b = 0;
if (threads != 0) { if (threads != 0) {
b = ntables % threads; b = ntables % threads;
} }
int startFrom = 0; uint64_t startFrom = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i; threadInfo *t_info = infosOfSub + i;
t_info->threadID = i; t_info->threadID = i;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册