提交 bea87b34 编写于 作者: weixin_48148422's avatar weixin_48148422

subscribe: fix bugs found in test

上级 a7eaa8ad
...@@ -3558,7 +3558,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -3558,7 +3558,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
p += sizeof(TSKEY); p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
} }
tscSaveSubscriptionProgress(pSql->pSubscription);
} }
pRes->row = 0; pRes->row = 0;
......
...@@ -49,7 +49,11 @@ typedef struct SSub { ...@@ -49,7 +49,11 @@ typedef struct SSub {
static int tscCompareSubscriptionProgress(const void* a, const void* b) { static int tscCompareSubscriptionProgress(const void* a, const void* b) {
return ((const SSubscriptionProgress*)a)->uid - ((const SSubscriptionProgress*)b)->uid; const SSubscriptionProgress* x = (const SSubscriptionProgress*)a;
const SSubscriptionProgress* y = (const SSubscriptionProgress*)b;
if (x->uid > y->uid) return 1;
if (x->uid < y->uid) return -1;
return 0;
} }
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) {
...@@ -175,7 +179,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -175,7 +179,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
if (!UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { if (!UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
numOfMeters += pVnodeSidList->numOfSids; numOfMeters += pVnodeSidList->numOfSids;
} }
} }
...@@ -195,7 +199,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -195,7 +199,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
numOfMeters = 0; numOfMeters = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j); SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pMeterInfo->uid; int64_t uid = pMeterInfo->uid;
...@@ -344,6 +348,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -344,6 +348,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub; SSub *pSub = (SSub *)tsub;
if (pSub == NULL) return NULL; if (pSub == NULL) return NULL;
tscSaveSubscriptionProgress(pSub);
SSqlObj* pSql = pSub->pSql; SSqlObj* pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -355,27 +361,36 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -355,27 +361,36 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
} }
} }
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { for (int retry = 0; retry < 3; retry++) {
tscTrace("begin meter synchronization"); if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) {
char* sqlstr = pSql->sqlstr; tscTrace("begin meter synchronization");
pSql->sqlstr = NULL; char* sqlstr = pSql->sqlstr;
taos_free_result_imp(pSql, 0); pSql->sqlstr = NULL;
pSql->sqlstr = sqlstr; taos_free_result_imp(pSql, 0);
taosClearDataCache(tscCacheHandle); pSql->sqlstr = sqlstr;
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; taosClearDataCache(tscCacheHandle);
tscTrace("meter synchronization completed"); if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
} else { tscTrace("meter synchronization completed");
uint16_t type = pSql->cmd.type; } else {
taos_free_result_imp(pSql, 1); uint16_t type = pSql->cmd.type;
pRes->numOfRows = 1; taos_free_result_imp(pSql, 1);
pRes->numOfTotal = 0; pRes->numOfRows = 1;
pRes->qhandle = 0; pRes->numOfTotal = 0;
pSql->thandle = NULL; pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->thandle = NULL;
pSql->cmd.type = type; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->cmd.type = type;
}
tscDoQuery(pSql);
if (pRes->code != TSDB_CODE_NOT_ACTIVE_TABLE) {
break;
}
// meter was removed, make sync time zero, so that next retry will
// do synchronization first
pSub->lastSyncTime = 0;
} }
tscDoQuery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscError("failed to query data, error code=%d", pRes->code); tscError("failed to query data, error code=%d", pRes->code);
tscRemoveFromSqlList(pSql); tscRemoveFromSqlList(pSql);
...@@ -394,7 +409,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { ...@@ -394,7 +409,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
taosTmrStop(pSub->pTimer); taosTmrStop(pSub->pTimer);
} }
if (!keepProgress) { if (keepProgress) {
tscSaveSubscriptionProgress(pSub);
} else {
char path[256]; char path[256];
sprintf(path, "%s/subscribe/%s", dataDir, pSub->topic); sprintf(path, "%s/subscribe/%s", dataDir, pSub->topic);
remove(path); remove(path);
......
...@@ -7,33 +7,30 @@ ...@@ -7,33 +7,30 @@
#include <taos.h> // include TDengine header file #include <taos.h> // include TDengine header file
#include <unistd.h> #include <unistd.h>
void print_result(TAOS_RES* res) { void print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res); int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res); TAOS_FIELD* fields = taos_fetch_fields(res);
#if 0 if (blockFetch) {
int nRows = taos_fetch_block(res, &row);
int nRows = taos_fetch_block(res, &row); for (int i = 0; i < nRows; i++) {
for (int i = 0; i < nRows; i++) { char temp[256];
char temp[256]; taos_print_row(temp, row + i, fields, num_fields);
taos_print_row(temp, row + i, fields, num_fields); puts(temp);
puts(temp); }
} } else {
while ((row = taos_fetch_row(res))) {
#else char temp[256];
taos_print_row(temp, row, fields, num_fields);
while ((row = taos_fetch_row(res))) { puts(temp);
char temp[256]; }
taos_print_row(temp, row, fields, num_fields);
puts(temp);
} }
#endif
} }
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
print_result(res); print_result(res, *(int*)param);
} }
...@@ -50,11 +47,12 @@ void check_row_count(int line, TAOS_RES* res, int expected) { ...@@ -50,11 +47,12 @@ void check_row_count(int line, TAOS_RES* res, int expected) {
} }
} }
void run_test(TAOS* taos) { void run_test(TAOS* taos) {
taos_query(taos, "drop database test;"); taos_query(taos, "drop database test;");
usleep(100000); usleep(100000);
taos_query(taos, "create database test;"); taos_query(taos, "create database test tables 5;");
usleep(100000); usleep(100000);
taos_query(taos, "use test;"); taos_query(taos, "use test;");
usleep(100000); usleep(100000);
...@@ -71,12 +69,19 @@ void run_test(TAOS* taos) { ...@@ -71,12 +69,19 @@ void run_test(TAOS* taos) {
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"); taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");
taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');");
// super tables subscription // super tables subscription
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub); TAOS_RES* res = taos_consume(tsub);
check_row_count(__LINE__, res, 11); check_row_count(__LINE__, res, 18);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
...@@ -90,18 +95,24 @@ void run_test(TAOS* taos) { ...@@ -90,18 +95,24 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
// keep progress information and continue previous subscription // keep progress information and restart subscription
taos_unsubscribe(tsub, 1); taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 15); check_row_count(__LINE__, res, 22);
// keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
// don't keep progress information and continue previous subscription // don't keep progress information and continue previous subscription
taos_unsubscribe(tsub, 0); taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 15); check_row_count(__LINE__, res, 22);
// single meter subscription // single meter subscription
...@@ -132,8 +143,7 @@ int main(int argc, char *argv[]) { ...@@ -132,8 +143,7 @@ int main(int argc, char *argv[]) {
const char* passwd = "taosdata"; const char* passwd = "taosdata";
const char* sql = "select * from meters;"; const char* sql = "select * from meters;";
const char* topic = "test-multiple"; const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1, test = 0; int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
TAOS_SUB* tsub = NULL;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-h=", 3) == 0) { if (strncmp(argv[i], "-h=", 3) == 0) {
...@@ -174,6 +184,10 @@ int main(int argc, char *argv[]) { ...@@ -174,6 +184,10 @@ int main(int argc, char *argv[]) {
test = 1; test = 1;
continue; continue;
} }
if (strcmp(argv[i], "-block-fetch") == 0) {
blockFetch = 1;
continue;
}
} }
// init TAOS // init TAOS
...@@ -191,9 +205,12 @@ int main(int argc, char *argv[]) { ...@@ -191,9 +205,12 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
TAOS_SUB* tsub = NULL;
if (async) { if (async) {
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000); // create an asynchronized subscription, the callback function will be called every 1s
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else { } else {
// create an synchronized subscription, need to call 'taos_consume' manually
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0); tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
} }
...@@ -206,8 +223,13 @@ int main(int argc, char *argv[]) { ...@@ -206,8 +223,13 @@ int main(int argc, char *argv[]) {
getchar(); getchar();
} else while(1) { } else while(1) {
TAOS_RES* res = taos_consume(tsub); TAOS_RES* res = taos_consume(tsub);
print_result(res); if (res == NULL) {
getchar(); printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
}
} }
taos_unsubscribe(tsub, keep); taos_unsubscribe(tsub, keep);
...@@ -215,4 +237,3 @@ int main(int argc, char *argv[]) { ...@@ -215,4 +237,3 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册