提交 b2e30242 编写于 作者: L lichuang

Merge branch 'develop' into feature/TD-4034

...@@ -58,7 +58,12 @@ cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_pat ...@@ -58,7 +58,12 @@ cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_pat
cp ${compile_dir}/../src/inc/taos.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../src/inc/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../src/inc/taoserror.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../src/inc/taoserror.h ${pkg_dir}${install_home_path}/include
cp -r ${top_dir}/tests/examples/* ${pkg_dir}${install_home_path}/examples cp -r ${top_dir}/tests/examples/* ${pkg_dir}${install_home_path}/examples
cp -r ${top_dir}/src/connector/grafanaplugin/dist ${pkg_dir}${install_home_path}/connector/grafanaplugin if [ -d "${top_dir}/src/connector/grafanaplugin/dist" ]; then
cp -r ${top_dir}/src/connector/grafanaplugin/dist ${pkg_dir}${install_home_path}/connector/grafanaplugin
else
echo "grafanaplugin bundled directory not found!"
exit 1
fi
cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/nodejs ${pkg_dir}${install_home_path}/connector cp -r ${top_dir}/src/connector/nodejs ${pkg_dir}${install_home_path}/connector
......
...@@ -66,7 +66,12 @@ cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin ...@@ -66,7 +66,12 @@ cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
cp -r %{_compiledir}/../src/connector/grafanaplugin/dist %{buildroot}%{homepath}/connector/grafanaplugin if [ -d %{_compiledir}/../src/connector/grafanaplugin/dist ]; then
cp -r %{_compiledir}/../src/connector/grafanaplugin/dist %{buildroot}%{homepath}/connector/grafanaplugin
else
echo grafanaplugin bundled directory not found!
exit 1
fi
cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector
......
...@@ -243,9 +243,17 @@ function install_data() { ...@@ -243,9 +243,17 @@ function install_data() {
} }
function install_connector() { function install_connector() {
if [ -d "${source_dir}/src/connector/grafanaplugin/dist" ]; then
${csudo} cp -rf ${source_dir}/src/connector/grafanaplugin/dist ${install_main_dir}/connector/grafanaplugin ${csudo} cp -rf ${source_dir}/src/connector/grafanaplugin/dist ${install_main_dir}/connector/grafanaplugin
else
echo "WARNING: grafanaplugin bundled dir not found, please check if want to use it!"
fi
if find ${source_dir}/src/connector/go -mindepth 1 -maxdepth 1 | read; then
${csudo} cp -r ${source_dir}/src/connector/go ${install_main_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector ${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp -rf ${source_dir}/src/connector/go ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null ${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
} }
......
...@@ -117,9 +117,17 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then ...@@ -117,9 +117,17 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
fi fi
if [ -d "${connector_dir}/grafanaplugin/dist" ]; then
cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin
cp -r ${connector_dir}/python ${install_dir}/connector/ else
echo "WARNING: grafanaplugin bundled dir not found, please check if want to use it!"
fi
if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then
cp -r ${connector_dir}/go ${install_dir}/connector cp -r ${connector_dir}/go ${install_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
cp -r ${connector_dir}/python ${install_dir}/connector
cp -r ${connector_dir}/nodejs ${install_dir}/connector cp -r ${connector_dir}/nodejs ${install_dir}/connector
fi fi
# Copy release note # Copy release note
......
...@@ -144,9 +144,17 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then ...@@ -144,9 +144,17 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
fi fi
if [ -d "${connector_dir}/grafanaplugin/dist" ]; then
cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin
cp -r ${connector_dir}/python ${install_dir}/connector/ else
echo "WARNING: grafanaplugin bunlded dir not found, please check if want to use it!"
fi
if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then
cp -r ${connector_dir}/go ${install_dir}/connector cp -r ${connector_dir}/go ${install_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
cp -r ${connector_dir}/python ${install_dir}/connector
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py
......
...@@ -131,9 +131,17 @@ connector_dir="${code_dir}/connector" ...@@ -131,9 +131,17 @@ connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
if [ -d "${connector_dir}/grafanaplugin/dist" ]; then
cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin
cp -r ${connector_dir}/python ${install_dir}/connector/ else
echo "WARNING: grafanaplugin bundled dir not found, please check if you want to use it!"
fi
if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then
cp -r ${connector_dir}/go ${install_dir}/connector cp -r ${connector_dir}/go ${install_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
cp -r ${connector_dir}/python ${install_dir}/connector
cp -r ${connector_dir}/nodejs ${install_dir}/connector cp -r ${connector_dir}/nodejs ${install_dir}/connector
fi fi
# Copy release note # Copy release note
......
...@@ -166,9 +166,18 @@ connector_dir="${code_dir}/connector" ...@@ -166,9 +166,18 @@ connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
if [ -d "${connector_dir}/grafanaplugin/dist" ]; then
cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin
cp -r ${connector_dir}/python ${install_dir}/connector/ else
echo "WARNING: grafanaplugin bundled dir not found, please check if want to use it!"
fi
if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then
cp -r ${connector_dir}/go ${install_dir}/connector cp -r ${connector_dir}/go ${install_dir}/connector
else
echo "WARNING: go connector not found, please check if want to use it!"
fi
cp -r ${connector_dir}/python ${install_dir}/connector/
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py
......
...@@ -5,13 +5,33 @@ ...@@ -5,13 +5,33 @@
"port": 6030, "port": 6030,
"user": "root", "user": "root",
"password": "taosdata", "password": "taosdata",
"databases": "dbx", "databases": "test",
"specified_table_query": "specified_table_query": {
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes", "concurrent": 1,
"sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}] "mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"resubAfterConsume": 10,
"sqls": [
{
"sql": "select avg(col1) from meters where col1 > 1;",
"result": "./subscribe_res0.txt"
}
]
}, },
"super_table_query": "super_table_query": {
{"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes", "stblname": "meters",
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}] "threads": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"sqls": [
{
"sql": "select col1 from xxxx where col1 > 10;",
"result": "./subscribe_res1.txt"
}
]
} }
} }
...@@ -372,6 +372,7 @@ typedef struct SpecifiedQueryInfo_S { ...@@ -372,6 +372,7 @@ typedef struct SpecifiedQueryInfo_S {
int subscribeKeepProgress; int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried; uint64_t totalQueried;
} SpecifiedQueryInfo; } SpecifiedQueryInfo;
...@@ -390,6 +391,7 @@ typedef struct SuperQueryInfo_S { ...@@ -390,6 +391,7 @@ typedef struct SuperQueryInfo_S {
uint64_t sqlCount; uint64_t sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
...@@ -4287,6 +4289,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4287,6 +4289,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume
&& resubAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
} else if (!resubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) { if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
...@@ -4456,6 +4470,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4456,6 +4470,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH); MAX_QUERY_SQL_LENGTH);
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(sql, "resubAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.resubAfterConsume[j] =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){ && result->valuestring != NULL){
...@@ -6546,17 +6572,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c ...@@ -6546,17 +6572,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
} }
static TAOS_SUB* subscribeImpl( static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, char* resultFileName) { TAOS *taos, char *sql, char* topic, bool restart,
char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, restart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else { } else {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, restart,
topic, sql, NULL, NULL, 0); topic, sql, NULL, NULL, 0);
} }
...@@ -6610,6 +6637,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6610,6 +6637,7 @@ static void *superSubscribe(void *sarg) {
return NULL; return NULL;
} }
uint64_t subSeq;
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
...@@ -6623,10 +6651,12 @@ static void *superSubscribe(void *sarg) { ...@@ -6623,10 +6651,12 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID);
} }
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr); __func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) { if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
...@@ -6634,6 +6664,10 @@ static void *superSubscribe(void *sarg) { ...@@ -6634,6 +6664,10 @@ static void *superSubscribe(void *sarg) {
} }
} }
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
...@@ -6643,7 +6677,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6643,7 +6677,7 @@ static void *superSubscribe(void *sarg) {
continue; continue;
} }
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taosMsleep(100); // ms taosMsleep(100); // ms
res = taos_consume(tsub[subSeq]); res = taos_consume(tsub[subSeq]);
if (res) { if (res) {
...@@ -6654,6 +6688,28 @@ static void *superSubscribe(void *sarg) { ...@@ -6654,6 +6688,28 @@ static void *superSubscribe(void *sarg) {
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, tmpFile);
} }
consumed[j] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[j] >=
g_queryInfo.superQueryInfo.resubAfterConsume[j])) {
printf("keepProgress:%d, resub super table query: %d\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, j);
taos_unsubscribe(tsub[subSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[j]= 0;
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(
pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
} }
} }
} }
...@@ -6663,9 +6719,8 @@ static void *superSubscribe(void *sarg) { ...@@ -6663,9 +6719,8 @@ static void *superSubscribe(void *sarg) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq], taos_unsubscribe(tsub[subSeq], 0);
g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
} }
...@@ -6713,14 +6768,22 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6713,14 +6768,22 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
} }
tsub[i] = subscribeImpl(pThreadInfo->taos, tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) { if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
...@@ -6736,14 +6799,32 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6736,14 +6799,32 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, tmpFile);
} }
consumed[i] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed[i] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) {
printf("keepProgress:%d, resub specified query: %d\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i);
consumed[i] = 0;
taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
} }
} }
} }
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], taos_unsubscribe(tsub[i], 0);
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册