diff --git a/.gitignore b/.gitignore index f0067d26ba465752e541331b0ed945a4caf5ca28..b9b5341b066f719d9d2d6e8a248e17de83a1b47d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ build/ compile_commands.json .cache .ycm_extra_conf.py +.tasks +.vimspector.json .vscode/ .idea/ cmake-build-debug/ diff --git a/example/src/tmq.c b/example/src/tmq.c index c1c6677b0287e39055506dc8edb1afdceed779ac..31bfe4197f793504d33aee357772842f49e5e8e5 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "taos.h" static int running = 1; @@ -65,7 +66,7 @@ int32_t init_env() { taos_free_result(pRes); - char* sql = "select * from st1"; + const char* sql = "select * from st1"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); /*if (taos_errno(pRes) != 0) {*/ /*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/ @@ -112,14 +113,20 @@ void basic_consume_loop(tmq_t *tmq, printf("subscribe err\n"); return; } - + int32_t cnt = 0; + clock_t startTime = clock(); while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); - if (tmq) { - msg_process(tmqmessage); + tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0); + if (tmqmessage) { + cnt++; + /*msg_process(tmqmessage);*/ tmq_message_destroy(tmqmessage); + } else { + break; } } + clock_t endTime = clock(); + printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC); err = tmq_consumer_close(tmq); if (err) @@ -163,6 +170,6 @@ int main() { code = init_env(); tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - /*basic_consume_loop(tmq, topic_list);*/ - sync_consume_loop(tmq, topic_list); + basic_consume_loop(tmq, topic_list); + /*sync_consume_loop(tmq, topic_list);*/ } diff --git a/include/client/taos.h b/include/client/taos.h index 7c5cc56b209c813847d635c783541f8aa4ce0b31..a8627a43da97c16a5912ebf4b1222f741cb28569 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -213,7 +213,7 @@ typedef struct tmq_message_t tmq_message_t; typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); DLL_EXPORT tmq_list_t *tmq_list_new(); -DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *); +DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); diff --git a/include/os/osEndian.h b/include/os/osEndian.h index e573ba0a751589c009f9e848d7352f0e7a090ea7..1f52ece535139da70c3563bb1cb13acd16c6346d 100644 --- a/include/os/osEndian.h +++ b/include/os/osEndian.h @@ -30,4 +30,4 @@ static const int32_t endian_test_var = 1; } #endif -#endif /*_TD_OS_ENDIAN_H_*/ \ No newline at end of file +#endif /*_TD_OS_ENDIAN_H_*/ diff --git a/include/os/osMath.h b/include/os/osMath.h index 948fbbf665f2b2a64d43431a0fb3c9f09b2b4451..3fe46d557ed348a934a32097f1aca74ba4b2e550 100644 --- a/include/os/osMath.h +++ b/include/os/osMath.h @@ -36,25 +36,25 @@ extern "C" { #else - #define TSWAP(a, b, c) \ - do { \ - typeof(a) __tmp = (a); \ - (a) = (b); \ - (b) = __tmp; \ + #define TSWAP(a, b, c) \ + do { \ + __typeof(a) __tmp = (a); \ + (a) = (b); \ + (b) = __tmp; \ } while (0) - #define TMAX(a, b) \ - ({ \ - typeof(a) __a = (a); \ - typeof(b) __b = (b); \ - (__a > __b) ? __a : __b; \ + #define TMAX(a, b) \ + ({ \ + __typeof(a) __a = (a); \ + __typeof(b) __b = (b); \ + (__a > __b) ? __a : __b; \ }) - #define TMIN(a, b) \ - ({ \ - typeof(a) __a = (a); \ - typeof(b) __b = (b); \ - (__a < __b) ? __a : __b; \ + #define TMIN(a, b) \ + ({ \ + __typeof(a) __a = (a); \ + __typeof(b) __b = (b); \ + (__a < __b) ? __a : __b; \ }) #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index facc8c721683a3cbda062176a5f6861ab8ad8ead..e57901ed2ed8a4fe030c405d93e913fc237cb00c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE + #include "clientInt.h" #include "clientLog.h" #include "parser.h" @@ -146,7 +148,7 @@ tmq_list_t* tmq_list_new() { return ptr; } -int32_t tmq_list_append(tmq_list_t* ptr, char* src) { +int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { if (ptr->cnt >= ptr->tot - 1) return -1; ptr->elems[ptr->cnt] = strdup(src); ptr->cnt++; @@ -366,7 +368,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i .igExists = 1, .physicalPlan = (char*)pStr, .sql = (char*)sql, - .logicalPlan = "no logic plan", + .logicalPlan = (char*)"no logic plan", }; int tlen = tSerializeSCMCreateTopicReq(NULL, &req); @@ -512,7 +514,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { return -1; } tDecodeSMqConsumeRsp(pMsg->pData, pRsp); - /*printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);*/ + /*printf("rsp %ld %ld %d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->numOfTopics == 0) { /*printf("no data\n");*/ free(pRsp); diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index 73f755bf1a7c8e545c178a61c65e1f8f24049e16..1cab413c3f650d53bfcc1475225d75ebc402a61b 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -630,14 +630,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) { } } -#define TSWAP(a, b, c) \ - do { \ - typeof(a) __tmp = (a); \ - (a) = (b); \ - (b) = __tmp; \ - } while (0) - - void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf) { switch (type) { case TSDB_DATA_TYPE_INT: diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6fd61baee387814b2550c0523a46c6b8caaabc51..fd960eb63eda1d34071b01b88ba1fad66171fb45 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE + #include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" @@ -54,13 +55,14 @@ void mndCleanupConsumer(SMnode *pMnode) {} SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_OUT_OF_MEMORY; + void* buf = NULL; int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) goto CM_ENCODE_OVER; - void *buf = malloc(tlen); + buf = malloc(tlen); if (buf == NULL) goto CM_ENCODE_OVER; void *abuf = buf; @@ -88,6 +90,7 @@ CM_ENCODE_OVER: SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + void* buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; @@ -106,7 +109,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t len; SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); - void *buf = malloc(len); + buf = malloc(len); if (buf == NULL) goto CM_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index bf138d335be2fd6331345a6302aa845e2ea1f00c..938a3eb89e61946447341855364e94f7e066715e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "mndSubscribe.h" #include "mndConsumer.h" @@ -372,13 +373,14 @@ void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { terrno = TSDB_CODE_OUT_OF_MEMORY; + void* buf = NULL; int32_t tlen = tEncodeSubscribeObj(NULL, pSub); int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); if (pRaw == NULL) goto SUB_ENCODE_OVER; - void *buf = malloc(tlen); + buf = malloc(tlen); if (buf == NULL) goto SUB_ENCODE_OVER; void *abuf = buf; @@ -406,6 +408,7 @@ SUB_ENCODE_OVER: static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + void* buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; @@ -425,7 +428,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t tlen; SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); - void *buf = malloc(tlen + 1); + buf = malloc(tlen + 1); if (buf == NULL) goto SUB_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index a9ba825a29af02a01d6c67d84a08c3814134a2d3..cefe9eff7209f8e7100a8f29b198324ed46679fa 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -16,9 +16,9 @@ #ifndef _TD_TQ_INT_H_ #define _TD_TQ_INT_H_ -#include "tq.h" #include "meta.h" #include "tlog.h" +#include "tq.h" #include "trpc.h" #ifdef __cplusplus extern "C" { @@ -26,29 +26,48 @@ extern "C" { extern int32_t tqDebugFlag; -#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }} -#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }} -#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }} -#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }} -#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }} -#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }} - -// create persistent storage for meta info such as consuming offset -// return value > 0: cgId -// return value <= 0: error code -// int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle); -// create ring buffer in memory and load consuming offset -// int tqOpenTCGroup(STQ*, const char* topic, int cgId); -// destroy ring buffer and persist consuming offset -// int tqCloseTCGroup(STQ*, const char* topic, int cgId); -// delete persistent storage for meta info -// int tqDropTCGroup(STQ*, const char* topic, int cgId); - -//int tqSerializeGroup(const STqGroup*, STqSerializedHead**); -//const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup); -int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**); -const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**); +#define tqFatal(...) \ + { \ + if (tqDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); \ + } \ + } +#define tqError(...) \ + { \ + if (tqDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); \ + } \ + } +#define tqWarn(...) \ + { \ + if (tqDebugFlag & DEBUG_WARN) { \ + taosPrintLog("TQ WARN ", 255, __VA_ARGS__); \ + } \ + } +#define tqInfo(...) \ + { \ + if (tqDebugFlag & DEBUG_INFO) { \ + taosPrintLog("TQ ", 255, __VA_ARGS__); \ + } \ + } +#define tqDebug(...) \ + { \ + if (tqDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \ + } \ + } +#define tqTrace(...) \ + { \ + if (tqDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \ + } \ + } + +int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**); +const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**); + static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4e52ecd8585240f26ebff9e417889eb22efd5e98..a625980505cd7c4dd21f802ad5b7978800d2e88e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -12,19 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "tcompare.h" #include "tqInt.h" #include "tqMetaStore.h" -// static -// read next version data -// -// send to fetch queue -// -// handle management message -// - int tqInit() { int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); if (old == 1) return 0; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 41fff4d228b86033f3bdbb0b9ba45e72aa62805d..1d9201f69dba28e92d86379355f6dcc3acb36606 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -170,7 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } if (pRead->pHead->head.version != ver) { - /*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/ + wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver); pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; @@ -178,7 +178,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { code = walValidBodyCksum(pRead->pHead); if (code != 0) { - /*wError("unexpected wal log version: checksum not passed");*/ + wError("unexpected wal log version: checksum not passed"); pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 04efbef5b341ce79ef1782996691a00b27d5f057..8999646f6a7ad8b61b53f51e67d60610c0ee2028 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE + #include "os.h" #include "osString.h" diff --git a/source/util/src/tbuffer.c b/source/util/src/tbuffer.c index ddd283ae0f84776203a0906913d32168fcf85775..0456d6a2eecca33a9509767dda27406bdf887e54 100644 --- a/source/util/src/tbuffer.c +++ b/source/util/src/tbuffer.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE + #include "tbuffer.h" #include "exception.h" #include "os.h" diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 5ce69f1c2e5f36688a437c25b49c53b23494e30a..12aa77214f6109e9a33082c428a7bb7722a62835 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -19,7 +19,6 @@ #include "tnote.h" #include "tutil.h" #include "ulog.h" -//#include "zlib.h" #define MAX_LOGLINE_SIZE (1000) #define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10) diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index 3373d0987600e8396a123370ac184e9efbcd3def..3cdba580d4dfaeb70455ca898db5ad06edeecf39 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE + #include "tpagedfile.h" #include "thash.h" #include "stddef.h"