From 517fb88329fea0ab20ca0b4a559096193d2ffa3a Mon Sep 17 00:00:00 2001 From: localvar Date: Tue, 24 Dec 2019 17:16:58 +0800 Subject: [PATCH] subscription (WIP) --- src/client/inc/tsclient.h | 2 + src/client/src/tscServer.c | 5 +- src/client/src/tscSql.c | 5 + src/client/src/tscSub.c | 275 ++++++++++++++++++++--------- src/client/src/tscUtil.c | 2 +- src/inc/taos.h | 9 +- src/inc/taosmsg.h | 2 + src/system/detail/src/vnodeShell.c | 1 + tests/examples/c/subscribe.c | 69 +++----- 9 files changed, 236 insertions(+), 134 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6adf2f1be1..e91b05d104 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,7 @@ typedef struct { int rspType; int rspLen; uint64_t qhandle; + int64_t uid; int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable int row; @@ -380,6 +381,7 @@ typedef struct _sql_obj { uint32_t queryId; void * thandle; void * pStream; + void * pSubscription; char * sqlstr; char retry; char maxRetry; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bdac9185eb..9a88ee9fc4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1531,7 +1531,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg; pMeterInfo->sid = htonl(pMeterMeta->sid); pMeterInfo->uid = htobe64(pMeterMeta->uid); - + pMeterInfo->skey = tscGetSubscriptionProgress(pSql, pMeterMeta->uid); pMsg += sizeof(SMeterSidExtInfo); } else { SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); @@ -1542,6 +1542,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn pMeterInfo->sid = htonl(pQueryMeterInfo->sid); pMeterInfo->uid = htobe64(pQueryMeterInfo->uid); + pMeterInfo->skey = tscGetSubscriptionProgress(pSql, pMeterMeta->uid); pMsg += sizeof(SMeterSidExtInfo); @@ -3535,7 +3536,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->precision = htons(pRetrieve->precision); pRes->offset = htobe64(pRetrieve->offset); - + pRes->uid = pRetrieve->uid; pRes->useconds = htobe64(pRetrieve->useconds); pRes->data = pRetrieve->data; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index c9d9050a29..404c3f947b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -627,6 +627,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { rows = taos_fetch_row_impl(res); } + if (rows != NULL && pSql->pSubscription != NULL) { + TSKEY ts = *(TSKEY*)rows[pCmd->fieldsInfo.numOfOutputCols - 1]; + tscUpdateSubscriptionProgress(pMeterMetaInfo->pMeterMeta->uid, ts); + } + // check!!! if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { break; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index f2e9395c68..af3a576377 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -22,125 +22,232 @@ #include "tsclient.h" #include "tsocket.h" #include "ttime.h" +#include "ttimer.h" #include "tutil.h" - -typedef struct { - void * signature; - char name[TSDB_METER_ID_LEN]; - int mseconds; - TSKEY lastKey; - uint64_t stime; - TAOS_FIELD fields[TSDB_MAX_COLUMNS]; - int numOfFields; - TAOS * taos; - TAOS_RES * result; +#include "tscUtil.h" + +typedef struct SSubscriptionProgress { + int64_t uid; + TSKEY key; +} SSubscriptionProgress; + +typedef struct SSub { + void * signature; + TAOS * taos; + void * pTimer; + SSqlObj * pSql; + int interval; + TAOS_SUBSCRIBE_CALLBACK fp; + void * param; + int numOfMeters; + SSubscriptionProgress * progress; } SSub; -TAOS_SUB *taos_subscribe(const char *host, const char *user, const char *pass, const char *db, const char *name, int64_t time, int mseconds) { - SSub *pSub; - pSub = (SSub *)malloc(sizeof(SSub)); - if (pSub == NULL) return NULL; - memset(pSub, 0, sizeof(SSub)); +static int tscCompareSubscriptionProgress(const void* a, const void* b) { + return ((const SSubscriptionProgress*)a)->uid - ((const SSubscriptionProgress*)b)->uid; +} - pSub->signature = pSub; - strcpy(pSub->name, name); - pSub->mseconds = mseconds; - pSub->lastKey = time; - if (pSub->lastKey == 0) { - pSub->lastKey = taosGetTimestampMs(); +TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid) { + if( pSql == NULL || pSql->pSubscription == NULL) + return 0; + + SSub* pSub = (SSub*)pSql->pSubscription; + for (int s = 0, e = pSub->numOfMeters; s < e;) { + int m = (s + e) / 2; + SSubscriptionProgress* p = pSub->progress + m; + if (p->uid > uid) + e = m; + else if (p->uid < uid) + s = m + 1; + else + return p->key; } - taos_init(); - pSub->taos = taos_connect(host, user, pass, NULL, 0); - if (pSub->taos == NULL) { - tfree(pSub); - } else { - char qstr[256] = {0}; - sprintf(qstr, "use %s", db); - int res = taos_query(pSub->taos, qstr); - if (res != 0) { - tscError("failed to open DB:%s", db); - taos_close(pSub->taos); - tfree(pSub); - } else { - snprintf(qstr, tListLen(qstr), "select * from %s where _c0 > now+1000d", pSub->name); - if (taos_query(pSub->taos, qstr)) { - tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos)); - taos_close(pSub->taos); - tfree(pSub); - return NULL; - } - pSub->result = taos_use_result(pSub->taos); - pSub->numOfFields = taos_num_fields(pSub->result); - memcpy(pSub->fields, taos_fetch_fields(pSub->result), sizeof(TAOS_FIELD) * pSub->numOfFields); + return 0; +} + +void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) { + if( pSql == NULL || pSql->pSubscription == NULL) + return; + + SSub* pSub = (SSub*)pSql->pSubscription; + for (int s = 0, e = pSub->numOfMeters; s < e;) { + int m = (s + e) / 2; + SSubscriptionProgress* p = pSub->progress + m; + if (p->uid > uid) + e = m; + else if (p->uid < uid) + s = m + 1; + else { + p->key = ts + 1; + break; } } +} + + +static SSub* tscCreateSubscription(STscObj* pObj, const char* sql) { + SSub* pSub = calloc(1, sizeof(SSub)); + if (pSub == NULL) { + globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscError("failed to allocate memory for subscription"); + return NULL; + } + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscError("failed to allocate SSqlObj for subscription"); + goto failed; + } + pSql->signature = pSql; + pSql->pTscObj = pObj; + + char* sqlstr = (char*)malloc(strlen(sql) + 1); + if (sqlstr == NULL) { + tscError("failed to allocate sql string for subscription"); + goto failed; + } + strcpy(sqlstr, sql); + strtolower(sqlstr, sqlstr); + pSql->sqlstr = sqlstr; + + tsem_init(&pSql->rspSem, 0, 0); + tsem_init(&pSql->emptyRspSem, 0, 1); + + SSqlRes *pRes = &pSql->res; + pRes->numOfRows = 1; + pRes->numOfTotal = 0; + + pSql->pSubscription = pSub; + pSub->pSql = pSql; + pSub->signature = pSub; return pSub; + +failed: + if (sqlstr != NULL) { + free(sqlstr); + } + if (pSql != NULL) { + free(pSql); + } + free(pSub); + return NULL; } -TAOS_ROW taos_consume(TAOS_SUB *tsub) { - SSub * pSub = (SSub *)tsub; - TAOS_ROW row; - char qstr[256]; - if (pSub == NULL) return NULL; - if (pSub->signature != pSub) return NULL; - - while (1) { - if (pSub->result != NULL) { - row = taos_fetch_row(pSub->result); - if (row != NULL) { - pSub->lastKey = *((uint64_t *)row[0]); - return row; - } - - taos_free_result(pSub->result); - pSub->result = NULL; - uint64_t etime = taosGetTimestampMs(); - int64_t mseconds = pSub->mseconds - etime + pSub->stime; - if (mseconds < 0) mseconds = 0; - taosMsleep((int)mseconds); - } +static void tscProcessSubscribeTimer(void *handle, void *tmrId) { + SSub *pSub = (SSub *)handle; + if (pSub == NULL || pSub->pTimer != tmrId) return; - pSub->stime = taosGetTimestampMs(); + TAOS_RES* res = taos_consume(pSub); + if (res != NULL) { + pSub->fp(pSub->param, res, 0); + taos_free_result(res); + } + + taosTmrReset(tscProcessSubscribeTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); +} - sprintf(qstr, "select * from %s where _c0 > %" PRId64 " order by _c0 asc", pSub->name, pSub->lastKey); - if (taos_query(pSub->taos, qstr)) { - tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos)); - return NULL; - } - pSub->result = taos_use_result(pSub->taos); +TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { + STscObj* pObj = (STscObj*)taos; + if (pObj == NULL || pObj->signature != pObj) { + globalCode = TSDB_CODE_DISCONNECTED; + tscError("connection disconnected"); + return NULL; + } + + SSub* pSub = tscCreateSubscription(pObj, sql); + if (pSub == NULL) { + return NULL; + } - if (pSub->result == NULL) { - tscTrace("failed to get result, reason:%s", taos_errstr(pSub->taos)); - return NULL; + int code = (uint8_t)tsParseSql(pSub->pSql, pObj->acctId, pObj->db, false); + if (code != TSDB_CODE_SUCCESS) { + taos_unsubscribe(pSub); + return NULL; + } + +// ??? if there's more than one vnode + SSqlCmd* pCmd = &pSub->pSql->cmd; + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { + pSub->numOfMeters = 1; + pSub->progress = calloc(1, sizeof(SSubscriptionProgress)); + pSub->progress[0].uid = pMeterMetaInfo->pMeterMeta->uid; + } else { + SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); + pSub->numOfMeters = pVnodeSidList->numOfSids; + pSub->progress = calloc(pSub->numOfMeters, sizeof(SSubscriptionProgress)); + for (int32_t i = 0; i < pSub->numOfMeters; ++i) { + SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); + pSub->progress[i].uid = pMeterInfo->uid; } + qsort(pSub->progress, pSub->numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); } - return NULL; + // timestamp must in the output column + SFieldInfo* pFieldInfo = &pCmd->fieldsInfo; + tscFieldInfoSetValue(pFieldInfo, pFieldInfo->numOfOutputCols, TSDB_DATA_TYPE_TIMESTAMP, "_c0", TSDB_KEYSIZE); + tscSqlExprInsertEmpty(pCmd, pFieldInfo->numOfOutputCols - 1, TSDB_FUNC_PRJ); + tscFieldInfoUpdateVisible(pFieldInfo, pFieldInfo->numOfOutputCols - 1, false); + tscFieldInfoCalOffset(pCmd); + + if (fp != NULL) { + pSub->fp = fp; + pSub->interval = interval; + pSub->param = param; + taosTmrReset(tscProcessSubscribeTimer, 0, pSub, tscTmr, &pSub->pTimer); + } + + return pSub; } -void taos_unsubscribe(TAOS_SUB *tsub) { +TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; + if (pSub == NULL) return NULL; + + SSqlObj* pSql = pSub->pSql; + SSqlRes *pRes = &pSql->res; + + pRes->numOfRows = 1; + pRes->numOfTotal = 0; + pRes->qhandle = 0; + pSql->thandle = NULL; - if (pSub == NULL) return; - if (pSub->signature != pSub) return; + tscDoQuery(pSql); + if (pRes->code != TSDB_CODE_SUCCESS) { + return NULL; + } + return pSql; +} + +void taos_unsubscribe(TAOS_SUB *tsub) { + SSub *pSub = (SSub *)tsub; + if (pSub == NULL || pSub->signature != pSub) return; - taos_close(pSub->taos); + if (pSub->pTimer != NULL) { + taosTmrStop(pSub->pTimer); + } + tscFreeSqlObj(pSub->pSql); + free(pSub->progress); + memset(pSub, 0, sizeof(*pSub)); free(pSub); } int taos_subfields_count(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; - return pSub->numOfFields; + return taos_num_fields(pSub->pSql); } TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; - return pSub->fields; + return pSub->pSql->cmd.fieldsInfo.pFields; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index bc6c73aaae..699c055249 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -819,7 +819,7 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE } void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible) { - if (index < 0 || index > pFieldInfo->numOfOutputCols) { + if (index < 0 || index >= pFieldInfo->numOfOutputCols) { return; } diff --git a/src/inc/taos.h b/src/inc/taos.h index c56d0e86d7..e6eac00a7c 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -116,11 +116,10 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); -DLL_EXPORT TAOS_SUB *taos_subscribe(const char *host, const char *user, const char *pass, const char *db, const char *table, int64_t time, int mseconds); -DLL_EXPORT TAOS_ROW taos_consume(TAOS_SUB *tsub); -DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub); -DLL_EXPORT int taos_subfields_count(TAOS_SUB *tsub); -DLL_EXPORT TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub); +typedef void (*TAOS_SUBSCRIBE_CALLBACK)(void *param, TAOS_RES *res, int code); +DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); +DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); +DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub); DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 22b10eaa60..89807f0a19 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -490,6 +490,7 @@ typedef struct SColumnInfo { typedef struct SMeterSidExtInfo { int32_t sid; int64_t uid; + TSKEY skey; // start key for subscription char tags[]; } SMeterSidExtInfo; @@ -572,6 +573,7 @@ typedef struct { int16_t precision; int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; + int64_t uid; char data[]; } SRetrieveMeterRsp; diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index ce1cabe141..5177afc13a 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -456,6 +456,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { if (code == TSDB_CODE_SUCCESS) { pRsp->offset = htobe64(vnodeGetOffsetVal((void*)pRetrieve->qhandle)); pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds); + pRsp->uid = ((SQInfo *)(pRetrieve->qhandle))->pObj->uid; } else { pRsp->offset = 0; pRsp->useconds = 0; diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index 219fa133e0..f09018b1d2 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -1,18 +1,3 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - // sample code for TDengine subscribe/consume API // to compile: gcc -o subscribe subscribe.c -ltaos @@ -21,38 +6,38 @@ #include #include // include TDengine header file -int main(int argc, char *argv[]) -{ - TAOS_SUB *tsub; - TAOS_ROW row; - char dbname[64], table[64]; - char temp[256]; - - if ( argc == 1 ) { - printf("usage: %s server-ip db-name table-name \n", argv[0]); - exit(0); - } - - if ( argc >= 2 ) strcpy(dbname, argv[2]); - if ( argc >= 3 ) strcpy(table, argv[3]); +int main(int argc, char *argv[]) { + // init TAOS + taos_init(); - tsub = taos_subscribe(argv[1], "root", "taosdata", dbname, table, 0, 1000); - if ( tsub == NULL ) { - printf("failed to connet to db:%s\n", dbname); + TAOS* taos = taos_connect(argv[1], "root", "taosdata", "test", 0); + if (taos == NULL) { + printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); exit(1); } - TAOS_FIELD *fields = taos_fetch_subfields(tsub); - int fcount = taos_subfields_count(tsub); - - printf("start to retrieve data\n"); - printf("please use other taos client, insert rows into %s.%s\n", dbname, table); - while ( 1 ) { - row = taos_consume(tsub); - if ( row == NULL ) break; + TAOS_SUB* tsub = taos_subscribe(taos, "select * from meters;", NULL, NULL, 0); + if ( tsub == NULL ) { + printf("failed to create subscription.\n"); + exit(0); + } - taos_print_row(temp, row, fields, fcount); - printf("%s\n", temp); + for( int i = 0; i < 3; i++ ) { + TAOS_RES* res = taos_consume(tsub); + TAOS_ROW row; + int rows = 0; + int num_fields = taos_subfields_count(tsub); + TAOS_FIELD *fields = taos_fetch_fields(res); + char temp[256]; + + // fetch the records row by row + while ((row = taos_fetch_row(res))) { + rows++; + taos_print_row(temp, row, fields, num_fields); + printf("%s\n", temp); + } + + printf("\n"); } taos_unsubscribe(tsub); -- GitLab