From 577cf92fdf3a6fa96ed3594145abae561c6bc7bd Mon Sep 17 00:00:00 2001 From: localvar Date: Tue, 12 May 2020 16:49:43 +0800 Subject: [PATCH] single table subscribe is done --- src/query/src/queryExecutor.c | 14 ++++++- tests/examples/c/subscribe.c | 73 +++++++++++++++++++++-------------- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 8bd2052ef8..24f0f53ca5 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -4185,6 +4185,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { + if(pQInfo->groupInfo.numOfTables == 1) { + SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + SGroupItem* pItem = taosArrayGet(pa, 0); + cond.twindow = pItem->info->win; + } + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); } } @@ -4903,6 +4909,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->current->lastKey, pQuery->window.ekey); + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + STableIdInfo tidInfo; + tidInfo.uid = pQuery->current->id.uid; + tidInfo.tid = pQuery->current->id.tid; + tidInfo.key = pQuery->current->lastKey; + taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); } if (!isTSCompQuery(pQuery)) { @@ -5195,7 +5207,6 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); -printf("createTableIdList: uid = %ld, key = %ld\n", pTableIdInfo->uid, pTableIdInfo->key); taosArrayPush(*pTableIdList, pTableIdInfo); pMsg += sizeof(STableIdInfo); } @@ -5759,7 +5770,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, // not a problem at present because we only use their 1st int64_t field STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id ); if (pTableId != NULL ) { - printf("create QInfoImpl: %ld %ld\n", pTableId->uid, pTableId->key); window.skey = pTableId->key; } else { window.skey = INT64_MIN; diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index 0bf93f6f2d..f9acf2bb10 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -56,32 +56,46 @@ void run_test(TAOS* taos) { taos_query(taos, "drop database if exists test;"); usleep(100000); - taos_query(taos, "create database test tables 5;"); + //taos_query(taos, "create database test tables 5;"); + taos_query(taos, "create database test;"); usleep(100000); taos_query(taos, "use test;"); + usleep(100000); - taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);"); - - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"); - taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');"); + taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);"); + + taos_query(taos, "create table t0 using meters tags(0);"); + taos_query(taos, "create table t1 using meters tags(1);"); + taos_query(taos, "create table t2 using meters tags(2);"); + taos_query(taos, "create table t3 using meters tags(3);"); + taos_query(taos, "create table t4 using meters tags(4);"); + taos_query(taos, "create table t5 using meters tags(5);"); + taos_query(taos, "create table t6 using meters tags(6);"); + taos_query(taos, "create table t7 using meters tags(7);"); + taos_query(taos, "create table t8 using meters tags(8);"); + taos_query(taos, "create table t9 using meters tags(9);"); + + taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);"); + taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);"); + taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);"); // super tables subscription + usleep(1000000); TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); TAOS_RES* res = taos_consume(tsub); @@ -90,23 +104,23 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');"); - taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);"); + taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');"); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); // keep progress information and restart subscription taos_unsubscribe(tsub, 1); - taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);"); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); check_row_count(__LINE__, res, 24); @@ -133,7 +147,7 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); @@ -197,7 +211,7 @@ int main(int argc, char *argv[]) { // init TAOS taos_init(); - TAOS* taos = taos_connect(host, user, passwd, "test", 0); + TAOS* taos = taos_connect(host, user, passwd, "", 0); if (taos == NULL) { printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); exit(1); @@ -209,6 +223,7 @@ int main(int argc, char *argv[]) { exit(0); } + taos_query(taos, "use test;"); TAOS_SUB* tsub = NULL; if (async) { // create an asynchronized subscription, the callback function will be called every 1s -- GitLab