提交 76ade609 编写于 作者: weixin_48148422's avatar weixin_48148422

tbase-916

上级 d7a8b0cb
...@@ -33,10 +33,10 @@ typedef struct SSubscriptionProgress { ...@@ -33,10 +33,10 @@ typedef struct SSubscriptionProgress {
} SSubscriptionProgress; } SSubscriptionProgress;
typedef struct SSub { typedef struct SSub {
void * signature;
char topic[32]; char topic[32];
int64_t lastSyncTime; int64_t lastSyncTime;
int64_t lastConsumeTime; int64_t lastConsumeTime;
void * signature;
TAOS * taos; TAOS * taos;
void * pTimer; void * pTimer;
SSqlObj * pSql; SSqlObj * pSql;
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <taos.h> // include TDengine header file #include <taos.h> // include TDengine header file
#include <unistd.h>
void print_result(TAOS_RES* res) { void print_result(TAOS_RES* res) {
TAOS_ROW row; TAOS_ROW row;
...@@ -24,13 +24,102 @@ void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { ...@@ -24,13 +24,102 @@ void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
} }
void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = 0;
TAOS_ROW row;
while ((row = taos_fetch_row(res))) {
actual++;
}
if (actual != expected) {
printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
} else {
printf("line %d: %d rows consumed as expected\n", line, actual);
}
}
void run_test(TAOS* taos) {
taos_query(taos, "drop database test;");
usleep(100000);
taos_query(taos, "create database test;");
usleep(100000);
taos_query(taos, "use test;");
usleep(100000);
taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");
// super tables subscription
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub);
check_row_count(__LINE__, res, 11);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.001', 0, 'UK');");
taos_query(taos, "insert into t1 using meters tags('london', 0) values('2020-01-01 00:03:00.001', 0, 'UK');");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
// keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 15);
// don't keep progress information and continue previous subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 15);
// single meter subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 4);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.001', 0, 'china');");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:04:00.000', 0, 'china');");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
taos_unsubscribe(tsub, 0);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
const char* host = "127.0.0.1"; const char* host = "127.0.0.1";
const char* user = "root"; const char* user = "root";
const char* passwd = "taosdata"; const char* passwd = "taosdata";
const char* sql = "select * from meters;"; const char* sql = "select * from meters;";
const char* topic = "test-multiple"; const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1; int async = 1, restart = 0, keep = 1, test = 0;
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
...@@ -68,6 +157,10 @@ int main(int argc, char *argv[]) { ...@@ -68,6 +157,10 @@ int main(int argc, char *argv[]) {
topic = "test-custom"; topic = "test-custom";
continue; continue;
} }
if (strcmp(argv[i], "-test") == 0) {
test = 1;
continue;
}
} }
// init TAOS // init TAOS
...@@ -79,6 +172,12 @@ int main(int argc, char *argv[]) { ...@@ -79,6 +172,12 @@ int main(int argc, char *argv[]) {
exit(1); exit(1);
} }
if (test) {
run_test(taos);
taos_close(taos);
exit(0);
}
if (async) { if (async) {
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000); tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000);
} else { } else {
...@@ -99,6 +198,7 @@ int main(int argc, char *argv[]) { ...@@ -99,6 +198,7 @@ int main(int argc, char *argv[]) {
} }
taos_unsubscribe(tsub, keep); taos_unsubscribe(tsub, keep);
taos_close(taos);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册