未验证 提交 055919a7 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 3992 taosdemo subscribe (#6062)

* [TD-3902]<fix>: taosdemo subscribe.

* [TD-3992]<fix>: taosdemo subscribe.

refactor sync/async mode.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 8a7fc6fc
...@@ -68,12 +68,6 @@ enum TEST_MODE { ...@@ -68,12 +68,6 @@ enum TEST_MODE {
INVAID_TEST INVAID_TEST
}; };
enum QUERY_MODE {
SYNC_QUERY_MODE, // 0
ASYNC_QUERY_MODE, // 1
INVALID_MODE
};
#define MAX_SQL_SIZE 65536 #define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2) #define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30 #define COND_BUF_LEN BUFFER_SIZE - 30
...@@ -119,8 +113,8 @@ typedef enum TALBE_EXISTS_EN { ...@@ -119,8 +113,8 @@ typedef enum TALBE_EXISTS_EN {
} TALBE_EXISTS_EN; } TALBE_EXISTS_EN;
enum MODE { enum MODE {
SYNC, SYNC_MODE,
ASYNC, ASYNC_MODE,
MODE_BUT MODE_BUT
}; };
...@@ -206,7 +200,7 @@ typedef struct SArguments_S { ...@@ -206,7 +200,7 @@ typedef struct SArguments_S {
bool verbose_print; bool verbose_print;
bool performance_print; bool performance_print;
char * output_file; char * output_file;
uint32_t query_mode; bool async_mode;
char * datatype[MAX_NUM_DATATYPE + 1]; char * datatype[MAX_NUM_DATATYPE + 1];
uint32_t len_of_binary; uint32_t len_of_binary;
uint32_t num_of_CPR; uint32_t num_of_CPR;
...@@ -344,7 +338,7 @@ typedef struct SDbs_S { ...@@ -344,7 +338,7 @@ typedef struct SDbs_S {
bool use_metric; bool use_metric;
bool insert_only; bool insert_only;
bool do_aggreFunc; bool do_aggreFunc;
bool queryMode; bool asyncMode;
uint32_t threadCount; uint32_t threadCount;
uint32_t threadCountByCreateTbl; uint32_t threadCountByCreateTbl;
...@@ -361,7 +355,7 @@ typedef struct SpecifiedQueryInfo_S { ...@@ -361,7 +355,7 @@ typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint64_t concurrent; uint64_t concurrent;
uint64_t sqlCount; uint64_t sqlCount;
uint32_t mode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
uint64_t queryTimes; uint64_t queryTimes;
int subscribeRestart; int subscribeRestart;
...@@ -376,7 +370,7 @@ typedef struct SuperQueryInfo_S { ...@@ -376,7 +370,7 @@ typedef struct SuperQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
uint64_t queryInterval; // 0: unlimit > 0 loop/s uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t threadCnt; uint32_t threadCnt;
uint32_t mode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
int subscribeRestart; int subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
...@@ -774,49 +768,48 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -774,49 +768,48 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
arguments->sqlFile = argv[++i]; arguments->sqlFile = argv[++i];
} else if (strcmp(argv[i], "-q") == 0) { } else if (strcmp(argv[i], "-q") == 0) {
if ((argc == i+1) if ((argc == i+1) ||
|| (!isStringNumber(argv[i+1]))) { (!isStringNumber(argv[i+1]))) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n"); errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->query_mode = atoi(argv[++i]); arguments->async_mode = atoi(argv[++i]);
} else if (strcmp(argv[i], "-T") == 0) { } else if (strcmp(argv[i], "-T") == 0) {
if ((argc == i+1) if ((argc == i+1) ||
|| (!isStringNumber(argv[i+1]))) { (!isStringNumber(argv[i+1]))) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-T need a number following!\n"); errorPrint("%s", "\n\t-T need a number following!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->num_of_threads = atoi(argv[++i]); arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0) { } else if (strcmp(argv[i], "-i") == 0) {
if ((argc == i+1) if ((argc == i+1) ||
|| (!isStringNumber(argv[i+1]))) { (!isStringNumber(argv[i+1]))) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-i need a number following!\n"); errorPrint("%s", "\n\t-i need a number following!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->insert_interval = atoi(argv[++i]); arguments->insert_interval = atoi(argv[++i]);
} else if (strcmp(argv[i], "-qt") == 0) { } else if (strcmp(argv[i], "-qt") == 0) {
if ((argc == i+1) if ((argc == i+1) ||
|| (!isStringNumber(argv[i+1])) (!isStringNumber(argv[i+1]))) {
|| (atoi(argv[i+1]) <= 0)) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-qt need a valid (>0) number following!\n"); errorPrint("%s", "\n\t-qt need a number following!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->query_times = atoi(argv[++i]); arguments->query_times = atoi(argv[++i]);
} else if (strcmp(argv[i], "-B") == 0) { } else if (strcmp(argv[i], "-B") == 0) {
if ((argc == i+1) if ((argc == i+1) ||
|| (!isStringNumber(argv[i+1]))) { (!isStringNumber(argv[i+1]))) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-B need a number following!\n"); errorPrint("%s", "\n\t-B need a number following!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->interlace_rows = atoi(argv[++i]); arguments->interlace_rows = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) { } else if (strcmp(argv[i], "-r") == 0) {
if ((argc == i+1) if ((argc == i+1) ||
|| (!isStringNumber(argv[i+1]))) { (!isStringNumber(argv[i+1]))) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-r need a number following!\n"); errorPrint("%s", "\n\t-r need a number following!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -1076,7 +1069,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { ...@@ -1076,7 +1069,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
if (code != 0) { if (code != 0) {
if (!quiet) { if (!quiet) {
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res)); errorPrint("Failed to run %s, reason: %s\n", command, taos_errstr(res));
} }
taos_free_result(res); taos_free_result(res);
//taos_close(taos); //taos_close(taos);
...@@ -1672,7 +1665,7 @@ static void printfQueryMeta() { ...@@ -1672,7 +1665,7 @@ static void printfQueryMeta() {
printf("concurrent: \033[33m%"PRIu64"\033[0m\n", printf("concurrent: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.concurrent); g_queryInfo.specifiedQueryInfo.concurrent);
printf("mod: \033[33m%s\033[0m\n", printf("mod: \033[33m%s\033[0m\n",
(g_queryInfo.specifiedQueryInfo.mode)?"async":"sync"); (g_queryInfo.specifiedQueryInfo.asyncMode)?"async":"sync");
printf("interval: \033[33m%"PRIu64"\033[0m\n", printf("interval: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", printf("restart: \033[33m%d\033[0m\n",
...@@ -1704,7 +1697,7 @@ static void printfQueryMeta() { ...@@ -1704,7 +1697,7 @@ static void printfQueryMeta() {
g_queryInfo.superQueryInfo.queryTimes); g_queryInfo.superQueryInfo.queryTimes);
printf("mod: \033[33m%s\033[0m\n", printf("mod: \033[33m%s\033[0m\n",
(g_queryInfo.superQueryInfo.mode)?"async":"sync"); (g_queryInfo.superQueryInfo.asyncMode)?"async":"sync");
printf("interval: \033[33m%"PRIu64"\033[0m\n", printf("interval: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", printf("restart: \033[33m%d\033[0m\n",
...@@ -4072,9 +4065,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4072,9 +4065,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times"); cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times");
if (gQueryTimes && gQueryTimes->type == cJSON_Number) { if (gQueryTimes && gQueryTimes->type == cJSON_Number) {
if (gQueryTimes->valueint <= 0) { if (gQueryTimes->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__, gQueryTimes->valueint); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
g_args.query_times = gQueryTimes->valueint; g_args.query_times = gQueryTimes->valueint;
...@@ -4123,9 +4116,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4123,9 +4116,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times"); "query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
if (specifiedQueryTimes->valueint <= 0) { if (specifiedQueryTimes->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__, specifiedQueryTimes->valueint); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -4152,20 +4145,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4152,20 +4145,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.specifiedQueryInfo.concurrent = 1; g_queryInfo.specifiedQueryInfo.concurrent = 1;
} }
cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode"); cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (mode && mode->type == cJSON_String if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String
&& mode->valuestring != NULL) { && specifiedAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", mode->valuestring)) { if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE; g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", mode->valuestring)) { } else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.mode = ASYNC_QUERY_MODE; g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query mode input error\n", errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE; g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
} }
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
...@@ -4267,9 +4260,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4267,9 +4260,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times"); cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) { if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
if (superQueryTimes->valueint <= 0) { if (superQueryTimes->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__, superQueryTimes->valueint); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint; g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
...@@ -4312,20 +4305,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4312,20 +4305,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* submode = cJSON_GetObjectItem(superQuery, "mode"); cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode");
if (submode && submode->type == cJSON_String if (superAsyncMode && superAsyncMode->type == cJSON_String
&& submode->valuestring != NULL) { && superAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) { if (0 == strcmp("sync", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE; g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", submode->valuestring)) { } else if (0 == strcmp("async", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.mode = ASYNC_QUERY_MODE; g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query mode input error\n", errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE; g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
} }
cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval"); cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval");
...@@ -5233,13 +5226,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5233,13 +5226,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTs = taosGetTimestampMs(); startTs = taosGetTimestampMs();
if (recOfBatch == 0) {
errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch);
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
goto free_of_interlace;
}
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
...@@ -5780,10 +5766,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5780,10 +5766,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
*/ */
tsem_init(&(t_info->lock_sem), 0, 0); tsem_init(&(t_info->lock_sem), 0, 0);
if (SYNC == g_Dbs.queryMode) { if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, syncWrite, t_info);
} else {
pthread_create(pids + i, NULL, asyncWrite, t_info); pthread_create(pids + i, NULL, asyncWrite, t_info);
} else {
pthread_create(pids + i, NULL, syncWrite, t_info);
} }
} }
...@@ -6469,7 +6455,7 @@ static TAOS_SUB* subscribeImpl( ...@@ -6469,7 +6455,7 @@ static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, char* resultFileName) { TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (ASYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
...@@ -6554,7 +6540,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6554,7 +6540,7 @@ static void *superSubscribe(void *sarg) {
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (ASYNC_QUERY_MODE == g_queryInfo.superQueryInfo.mode) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue; continue;
} }
...@@ -6643,7 +6629,7 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6643,7 +6629,7 @@ static void *specifiedSubscribe(void *sarg) {
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
...@@ -6866,7 +6852,7 @@ static void setParaFromArg(){ ...@@ -6866,7 +6852,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads; g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.queryMode = g_args.query_mode; g_Dbs.asyncMode = g_args.async_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册