From f2eac06a3389f1afcee9a5b0b49ca1954c936d8e Mon Sep 17 00:00:00 2001 From: localvar Date: Thu, 2 Jan 2020 17:52:45 +0800 Subject: [PATCH] TBASE-1423: single meter subscription --- src/system/detail/src/vnodeRead.c | 8 +++++++- src/system/detail/src/vnodeShell.c | 11 ++++++++++- tests/examples/c/makefile | 9 +++++---- tests/examples/c/subscribe.c | 9 +++++++-- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 9612cc6eb6..be5cf5e6f6 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -630,7 +630,13 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE pQuery = &(pQInfo->query); dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); - pQuery->skey = pQueryMsg->skey; + SMeterSidExtInfo** pSids = (SMeterSidExtInfo**)pQueryMsg->pSidExtInfo; + if (pSids != NULL && pSids[0]->key > 0) { + pQuery->skey = pSids[0]->key; + } else { + pQuery->skey = pQueryMsg->skey; + } + pQuery->ekey = pQueryMsg->ekey; pQuery->lastKey = pQuery->skey; diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index e527164df1..66cc16d9df 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -444,6 +444,8 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { int progressSize = 0; if (pQInfo->pMeterQuerySupporter != NULL) progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t); + else if (pQInfo->pObj != NULL) + progressSize = sizeof(int64_t) + sizeof(TSKEY) + sizeof(int32_t); pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100); if (pStart == NULL) { @@ -478,7 +480,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { // write the progress information of each meter to response // this is required by subscriptions - if (progressSize > 0) { + if (pQInfo->pMeterQuerySupporter != NULL) { *((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); pMsg += sizeof(int32_t); for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { @@ -487,6 +489,13 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { *((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); pMsg += sizeof(TSKEY); } + } else if (pQInfo->pObj != NULL) { + *((int32_t*)pMsg) = htonl(1); + pMsg += sizeof(int32_t); + *((int64_t*)pMsg) = htobe64(pQInfo->pObj->uid); + pMsg += sizeof(int64_t); + *((TSKEY*)pMsg) = htobe64(pQInfo->query.lastKey); + pMsg += sizeof(TSKEY); } msgLen = pMsg - pStart; diff --git a/tests/examples/c/makefile b/tests/examples/c/makefile index d68a734e95..0a4b8ee9d2 100644 --- a/tests/examples/c/makefile +++ b/tests/examples/c/makefile @@ -4,16 +4,17 @@ ROOT=./ TARGET=exe LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt +#LFLAGS = '-Wl,-rpath,/home/zbm/project/td/debug/build/lib/' -L/home/zbm/project/td/debug/build/lib -ltaos -lpthread -lm -lrt CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 \ -I/usr/local/taos/include -std=gnu99 all: $(TARGET) exe: -gcc $(CFLAGS) ./asyncdemo.c -o $(ROOT)/asyncdemo $(LFLAGS) -gcc $(CFLAGS) ./demo.c -o $(ROOT)/demo $(LFLAGS) -gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS) -gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS) + gcc $(CFLAGS) ./asyncdemo.c -o $(ROOT)/asyncdemo $(LFLAGS) + gcc $(CFLAGS) ./demo.c -o $(ROOT)/demo $(LFLAGS) + gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS) + gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS) gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS) clean: diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index 7c76e1088c..c81246bac6 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -28,6 +28,7 @@ int main(int argc, char *argv[]) { const char* host = "127.0.0.1"; const char* user = "root"; const char* passwd = "taosdata"; + const char* sql = "select * from meters;"; int async = 1, restart = 0; TAOS_SUB* tsub = NULL; @@ -52,6 +53,10 @@ int main(int argc, char *argv[]) { restart = 1; continue; } + if (strcmp(argv[i], "-single") == 0) { + sql = "select * from t0;"; + continue; + } } // init TAOS @@ -64,9 +69,9 @@ int main(int argc, char *argv[]) { } if (async) { - tsub = taos_subscribe("test", restart, taos, "select * from meters;", subscribe_callback, NULL, 1000); + tsub = taos_subscribe("test", restart, taos, sql, subscribe_callback, NULL, 1000); } else { - tsub = taos_subscribe("test", restart, taos, "select * from meters;", NULL, NULL, 0); + tsub = taos_subscribe("test", restart, taos, sql, NULL, NULL, 0); } if (tsub == NULL) { -- GitLab