diff --git a/example/src/tmq.c b/example/src/tmq.c index 26e5ea82c19a3f0ac90689a4b4f1362caa193ec6..094fd94bfc00a137073dae63a41e4754fb922ced 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -66,7 +66,7 @@ int32_t init_env() { } int32_t create_topic() { - printf("create topic"); + printf("create topic\n"); TAOS_RES* pRes; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -91,6 +91,10 @@ int32_t create_topic() { return 0; } +void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { + printf("commit %d\n", resp); +} + tmq_t* build_consumer() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -103,6 +107,7 @@ tmq_t* build_consumer() { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); return tmq; } @@ -144,7 +149,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - static const int MIN_COMMIT_COUNT = 1000; + static const int MIN_COMMIT_COUNT = 1; int msg_count = 0; tmq_resp_err_t err; @@ -214,6 +219,6 @@ int main(int argc, char* argv[]) { tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); /*perf_loop(tmq, topic_list);*/ - basic_consume_loop(tmq, topic_list); - /*sync_consume_loop(tmq, topic_list);*/ + /*basic_consume_loop(tmq, topic_list);*/ + sync_consume_loop(tmq, topic_list); } diff --git a/include/client/taos.h b/include/client/taos.h index a960d379375acca491c7ec1b4b0541e3dae3489e..2c8135c8ffaf5e920e76544f5a0796df6d5d0eda 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -198,8 +198,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi /* --------------------------TMQ INTERFACE------------------------------- */ enum tmq_resp_err_t { + TMQ_RESP_ERR__FAIL = -1, TMQ_RESP_ERR__SUCCESS = 0, - TMQ_RESP_ERR__FAIL = 1, }; typedef enum tmq_resp_err_t tmq_resp_err_t; @@ -226,7 +226,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); #if 0 DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); -DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics); #endif DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); @@ -238,6 +238,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t * #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); #endif +DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { diff --git a/include/common/common.h b/include/common/common.h index fd5b6717abb9d7bd1cf3b35e8eda56c3194a523f..8fa2d03d6d3f79a220b6c10a38070a8a42026155 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -16,7 +16,6 @@ #ifndef TDENGINE_COMMON_H #define TDENGINE_COMMON_H - #ifdef __cplusplus extern "C" { #endif @@ -43,14 +42,16 @@ extern "C" { // int16_t bytes; // } SSchema; -#define TMQ_REQ_TYPE_COMMIT_ONLY 0 -#define TMQ_REQ_TYPE_CONSUME_ONLY 1 -#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2 +enum { + TMQ_CONF__RESET_OFFSET__LATEST = -1, + TMQ_CONF__RESET_OFFSET__EARLIEAST = -2, + TMQ_CONF__RESET_OFFSET__NONE = -3, +}; typedef struct { uint32_t numOfTables; - SArray *pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid + SArray* pGroupList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; typedef struct SColumnDataAgg { @@ -79,14 +80,14 @@ typedef struct SConstantItem { // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { - SColumnDataAgg *pBlockAgg; - SArray *pDataBlock; // SArray - SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. - SDataBlockInfo info; + SColumnDataAgg* pBlockAgg; + SArray* pDataBlock; // SArray + SArray* pConstantList; // SArray, it is a constant/tags value of the corresponding result value. + SDataBlockInfo info; } SSDataBlock; typedef struct SVarColAttr { - int32_t *offset; // start position for each entry in the list + int32_t* offset; // start position for each entry in the list uint32_t length; // used buffer size that contain the valid data uint32_t allocLen; // allocated buffer size } SVarColAttr; @@ -94,11 +95,11 @@ typedef struct SVarColAttr { // pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - SColumnInfo info; // TODO filter info needs to be removed - bool hasNull;// if current column data has null value. - char *pData; // the corresponding block data in memory + SColumnInfo info; // TODO filter info needs to be removed + bool hasNull; // if current column data has null value. + char* pData; // the corresponding block data in memory union { - char *nullbitmap; // bitmap, one bit for each item in the list + char* nullbitmap; // bitmap, one bit for each item in the list SVarColAttr varmeta; }; } SColumnInfoData; @@ -149,7 +150,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp int32_t tlen = 0; int32_t sz = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeFixedI64(buf, pRsp->committedOffset); tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); @@ -170,7 +170,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { int32_t sz; buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeFixedI64(buf, &pRsp->committedOffset); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); @@ -250,11 +249,11 @@ typedef struct SSqlExpr { char token[TSDB_COL_NAME_LEN]; // original token SSchema resSchema; - int32_t numOfCols; - SColumn* pColumns; // data columns that are required by query - int32_t interBytes; // inter result buffer size - int16_t numOfParams; // argument value of each function - SVariant param[3]; // parameters are not more than 3 + int32_t numOfCols; + SColumn* pColumns; // data columns that are required by query + int32_t interBytes; // inter result buffer size + int16_t numOfParams; // argument value of each function + SVariant param[3]; // parameters are not more than 3 } SSqlExpr; typedef struct SExprInfo { @@ -271,7 +270,7 @@ typedef struct SSessionWindow { SColumn col; } SSessionWindow; -#define QUERY_ASC_FORWARD_STEP 1 +#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_DESC_FORWARD_STEP -1 #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5164fb6ccd2323764ac3aec259559ee572fbde4f..d92efd163d03b5fa26e8939c22d6d7c0611542f3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -422,8 +422,8 @@ typedef struct { } SColumnInfo; typedef struct { - uint64_t uid; - TSKEY key; // last accessed ts, for subscription + int64_t uid; + TSKEY key; // last accessed ts, for subscription } STableIdInfo; typedef struct STimeWindow { @@ -554,8 +554,8 @@ int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); typedef struct { - char db[TSDB_DB_FNAME_LEN]; - uint64_t uid; + char db[TSDB_DB_FNAME_LEN]; + int64_t uid; } SDropDbRsp; int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); @@ -570,12 +570,12 @@ int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); typedef struct { - char db[TSDB_DB_FNAME_LEN]; - uint64_t uid; - int32_t vgVersion; - int32_t vgNum; - int8_t hashMethod; - SArray* pVgroupInfos; // Array of SVgroupInfo + char db[TSDB_DB_FNAME_LEN]; + int64_t uid; + int32_t vgVersion; + int32_t vgNum; + int8_t hashMethod; + SArray* pVgroupInfos; // Array of SVgroupInfo } SUseDbRsp; int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); @@ -725,7 +725,7 @@ typedef struct { int32_t vgId; int32_t dnodeId; char db[TSDB_DB_FNAME_LEN]; - uint64_t dbUid; + int64_t dbUid; int32_t vgVersion; int32_t cacheBlockSize; int32_t totalBlocks; @@ -753,10 +753,10 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); typedef struct { - int32_t vgId; - int32_t dnodeId; - uint64_t dbUid; - char db[TSDB_DB_FNAME_LEN]; + int32_t vgId; + int32_t dnodeId; + int64_t dbUid; + char db[TSDB_DB_FNAME_LEN]; } SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); @@ -796,7 +796,7 @@ typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; - uint64_t dbId; + int64_t dbId; int32_t numOfTags; int32_t numOfColumns; int8_t precision; @@ -804,8 +804,8 @@ typedef struct { int8_t update; int32_t sversion; int32_t tversion; - uint64_t suid; - uint64_t tuid; + int64_t suid; + int64_t tuid; int32_t vgId; SSchema* pSchemas; } STableMetaRsp; @@ -1268,7 +1268,7 @@ typedef struct { typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; - uint64_t tuid; + int64_t tuid; int32_t sverson; int32_t execLen; char* executor; @@ -1279,11 +1279,11 @@ typedef struct { typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; - uint64_t tuid; + int64_t tuid; } SDDropTopicReq; typedef struct SVCreateTbReq { - uint64_t ver; // use a general definition + int64_t ver; // use a general definition char* name; uint32_t ttl; uint32_t keep; @@ -1314,8 +1314,8 @@ int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); typedef struct { - uint64_t ver; // use a general definition - SArray* pArray; + int64_t ver; // use a general definition + SArray* pArray; } SVCreateTbBatchReq; typedef struct { @@ -1325,7 +1325,7 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); typedef struct { - uint64_t ver; + int64_t ver; char* name; uint8_t type; tb_uid_t suid; @@ -1760,35 +1760,19 @@ typedef struct { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqOffset; -typedef struct { - int32_t vgId; - SArray* offsets; // SArray -} SMqVgOffsets; - typedef struct { int32_t num; SMqOffset* offsets; -} SMqCMResetOffsetReq; +} SMqCMCommitOffsetReq; typedef struct { int32_t reserved; -} SMqCMResetOffsetRsp; - -typedef struct { - int64_t leftForVer; - SMqVgOffsets offsets; -} SMqMVResetOffsetReq; - -typedef struct { - int32_t reserved; -} SMqMVResetOffsetRsp; +} SMqCMCommitOffsetRsp; int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); -int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq); -int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq); -int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq); -int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq); +int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq); +int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq); typedef struct { uint32_t nCols; @@ -1870,7 +1854,6 @@ typedef struct { typedef struct { int64_t consumerId; SSchemaWrapper* schemas; - int64_t committedOffset; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; @@ -1881,22 +1864,18 @@ typedef struct { // one req for one vg+topic typedef struct { SMsgHead head; - // 0: commit only, current offset - // 1: consume only, poll next offset - // 2: commit current and consume next offset - int32_t reqType; - int64_t reqId; int64_t consumerId; int64_t blockingTime; char cgroup[TSDB_CONSUMER_GROUP_LEN]; - int64_t offset; + int64_t currentOffset; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; typedef struct { int32_t vgId; + int64_t offset; SEpSet epSet; } SMqSubVgEp; @@ -1917,12 +1896,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taos static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); + tlen += taosEncodeFixedI64(buf, pVgEp->offset); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeFixedI64(buf, &pVgEp->offset); buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return buf; } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1a63ea73a597d215abb3a8f0b43922da67bcb647..b20d6de5561c6b58fe0ebca799e3fe19ccfa4950 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -143,10 +143,10 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 1e967a6d2bc7e08a846c6619f94352f6e05c2e5e..bf48a8523c427a10801b0cccaf1046d08c9cc07f 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -113,14 +113,15 @@ typedef enum { SDB_USER = 7, SDB_AUTH = 8, SDB_ACCT = 9, - SDB_SUBSCRIBE = 10, - SDB_CONSUMER = 11, - SDB_TOPIC = 12, - SDB_VGROUP = 13, - SDB_STB = 14, - SDB_DB = 15, - SDB_FUNC = 16, - SDB_MAX = 17 + SDB_OFFSET = 10, + SDB_SUBSCRIBE = 11, + SDB_CONSUMER = 12, + SDB_TOPIC = 13, + SDB_VGROUP = 14, + SDB_STB = 15, + SDB_DB = 16, + SDB_FUNC = 17, + SDB_MAX = 18 } ESdbType; typedef struct SSdb SSdb; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 92028b08378af35926d7c9215ecc95fb5c63b5f3..b947142958e555ffe523de2db4e6be9fd030907c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -20,6 +20,8 @@ extern "C" { #endif +// clang-format off + #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) @@ -260,6 +262,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7) #define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8) #define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) +#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA) #define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0) // dnode diff --git a/include/util/tdef.h b/include/util/tdef.h index a4c333a4d296d4130016d39aad6cd1550dde8ca1..407d0ccd9e76fa62900bb62e2122d706507be9a7 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -24,373 +24,370 @@ extern "C" { #define TSDB__packed -#define TSKEY int64_t -#define TSKEY_MIN INT64_MIN -#define TSKEY_MAX (INT64_MAX - 1) +#define TSKEY int64_t +#define TSKEY_MIN INT64_MIN +#define TSKEY_MAX (INT64_MAX - 1) #define TSKEY_INITIAL_VAL TSKEY_MIN // Bytes for each type. extern const int32_t TYPE_BYTES[15]; // TODO: replace and remove code below -#define CHAR_BYTES sizeof(char) -#define SHORT_BYTES sizeof(int16_t) -#define INT_BYTES sizeof(int32_t) -#define LONG_BYTES sizeof(int64_t) -#define FLOAT_BYTES sizeof(float) -#define DOUBLE_BYTES sizeof(double) -#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) -#define TSDB_KEYSIZE sizeof(TSKEY) -#define TSDB_NCHAR_SIZE sizeof(int32_t) +#define CHAR_BYTES sizeof(char) +#define SHORT_BYTES sizeof(int16_t) +#define INT_BYTES sizeof(int32_t) +#define LONG_BYTES sizeof(int64_t) +#define FLOAT_BYTES sizeof(float) +#define DOUBLE_BYTES sizeof(double) +#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) +#define TSDB_KEYSIZE sizeof(TSKEY) +#define TSDB_NCHAR_SIZE sizeof(int32_t) // NULL definition -#define TSDB_DATA_BOOL_NULL 0x02 -#define TSDB_DATA_TINYINT_NULL 0x80 -#define TSDB_DATA_SMALLINT_NULL 0x8000 -#define TSDB_DATA_INT_NULL 0x80000000L -#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L -#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL - -#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN -#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN -#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF -#define TSDB_DATA_BINARY_NULL 0xFF - -#define TSDB_DATA_UTINYINT_NULL 0xFF -#define TSDB_DATA_USMALLINT_NULL 0xFFFF -#define TSDB_DATA_UINT_NULL 0xFFFFFFFF -#define TSDB_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL - -#define TSDB_DATA_NULL_STR "NULL" -#define TSDB_DATA_NULL_STR_L "null" - -#define TSDB_NETTEST_USER "nettestinternal" -#define TSDB_DEFAULT_USER "root" +#define TSDB_DATA_BOOL_NULL 0x02 +#define TSDB_DATA_TINYINT_NULL 0x80 +#define TSDB_DATA_SMALLINT_NULL 0x8000 +#define TSDB_DATA_INT_NULL 0x80000000L +#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L +#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL + +#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN +#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN +#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF +#define TSDB_DATA_BINARY_NULL 0xFF + +#define TSDB_DATA_UTINYINT_NULL 0xFF +#define TSDB_DATA_USMALLINT_NULL 0xFFFF +#define TSDB_DATA_UINT_NULL 0xFFFFFFFF +#define TSDB_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL + +#define TSDB_DATA_NULL_STR "NULL" +#define TSDB_DATA_NULL_STR_L "null" + +#define TSDB_NETTEST_USER "nettestinternal" +#define TSDB_DEFAULT_USER "root" #ifdef _TD_POWER_ -#define TSDB_DEFAULT_PASS "powerdb" +#define TSDB_DEFAULT_PASS "powerdb" #elif (_TD_TQ_ == true) -#define TSDB_DEFAULT_PASS "tqueue" +#define TSDB_DEFAULT_PASS "tqueue" #elif (_TD_PRO_ == true) -#define TSDB_DEFAULT_PASS "prodb" +#define TSDB_DEFAULT_PASS "prodb" #else -#define TSDB_DEFAULT_PASS "taosdata" +#define TSDB_DEFAULT_PASS "taosdata" #endif -#define SHELL_MAX_PASSWORD_LEN 20 +#define SHELL_MAX_PASSWORD_LEN 20 -#define TSDB_TRUE 1 -#define TSDB_FALSE 0 -#define TSDB_OK 0 +#define TSDB_TRUE 1 +#define TSDB_FALSE 0 +#define TSDB_OK 0 #define TSDB_ERR -1 #define TS_PATH_DELIMITER "." #define TS_ESCAPE_CHAR '`' -#define TSDB_TIME_PRECISION_MILLI 0 -#define TSDB_TIME_PRECISION_MICRO 1 -#define TSDB_TIME_PRECISION_NANO 2 +#define TSDB_TIME_PRECISION_MILLI 0 +#define TSDB_TIME_PRECISION_MICRO 1 +#define TSDB_TIME_PRECISION_NANO 2 #define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_NANO_STR "ns" -#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))) +#define TSDB_TICK_PER_SECOND(precision) \ + ((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \ + : ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))) #define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member) -#define T_APPEND_MEMBER(dst, ptr, type, member) \ -do {\ - memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\ - dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\ -} while(0) -#define T_READ_MEMBER(src, type, target) \ -do { \ - (target) = *(type *)(src); \ - (src) = (void *)((char *)src + sizeof(type));\ -} while(0) - +#define T_APPEND_MEMBER(dst, ptr, type, member) \ + do { \ + memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member)); \ + dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member)); \ + } while (0) +#define T_READ_MEMBER(src, type, target) \ + do { \ + (target) = *(type *)(src); \ + (src) = (void *)((char *)src + sizeof(type)); \ + } while (0) // TODO: check if below is necessary -#define TSDB_RELATION_INVALID 0 -#define TSDB_RELATION_LESS 1 -#define TSDB_RELATION_GREATER 2 -#define TSDB_RELATION_EQUAL 3 -#define TSDB_RELATION_LESS_EQUAL 4 +#define TSDB_RELATION_INVALID 0 +#define TSDB_RELATION_LESS 1 +#define TSDB_RELATION_GREATER 2 +#define TSDB_RELATION_EQUAL 3 +#define TSDB_RELATION_LESS_EQUAL 4 #define TSDB_RELATION_GREATER_EQUAL 5 -#define TSDB_RELATION_NOT_EQUAL 6 -#define TSDB_RELATION_LIKE 7 -#define TSDB_RELATION_NOT_LIKE 8 -#define TSDB_RELATION_ISNULL 9 -#define TSDB_RELATION_NOTNULL 10 -#define TSDB_RELATION_IN 11 -#define TSDB_RELATION_NOT_IN 12 - -#define TSDB_RELATION_AND 13 -#define TSDB_RELATION_OR 14 -#define TSDB_RELATION_NOT 15 - -#define TSDB_RELATION_MATCH 16 -#define TSDB_RELATION_NMATCH 17 - -#define TSDB_BINARY_OP_ADD 4000 -#define TSDB_BINARY_OP_SUBTRACT 4001 -#define TSDB_BINARY_OP_MULTIPLY 4002 -#define TSDB_BINARY_OP_DIVIDE 4003 -#define TSDB_BINARY_OP_REMAINDER 4004 -#define TSDB_BINARY_OP_CONCAT 4005 - -#define FUNCTION_CEIL 4500 -#define FUNCTION_FLOOR 4501 -#define FUNCTION_ABS 4502 -#define FUNCTION_ROUND 4503 - -#define FUNCTION_LENGTH 4800 -#define FUNCTION_CONCAT 4801 -#define FUNCTION_LTRIM 4802 -#define FUNCTION_RTRIM 4803 - -#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN)) +#define TSDB_RELATION_NOT_EQUAL 6 +#define TSDB_RELATION_LIKE 7 +#define TSDB_RELATION_NOT_LIKE 8 +#define TSDB_RELATION_ISNULL 9 +#define TSDB_RELATION_NOTNULL 10 +#define TSDB_RELATION_IN 11 +#define TSDB_RELATION_NOT_IN 12 + +#define TSDB_RELATION_AND 13 +#define TSDB_RELATION_OR 14 +#define TSDB_RELATION_NOT 15 + +#define TSDB_RELATION_MATCH 16 +#define TSDB_RELATION_NMATCH 17 + +#define TSDB_BINARY_OP_ADD 4000 +#define TSDB_BINARY_OP_SUBTRACT 4001 +#define TSDB_BINARY_OP_MULTIPLY 4002 +#define TSDB_BINARY_OP_DIVIDE 4003 +#define TSDB_BINARY_OP_REMAINDER 4004 +#define TSDB_BINARY_OP_CONCAT 4005 + +#define FUNCTION_CEIL 4500 +#define FUNCTION_FLOOR 4501 +#define FUNCTION_ABS 4502 +#define FUNCTION_ROUND 4503 + +#define FUNCTION_LENGTH 4800 +#define FUNCTION_CONCAT 4801 +#define FUNCTION_LTRIM 4802 +#define FUNCTION_RTRIM 4803 + +#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN)) #define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER)) -#define TSDB_NAME_DELIMITER_LEN 1 +#define TSDB_NAME_DELIMITER_LEN 1 -#define TSDB_UNI_LEN 24 -#define TSDB_USER_LEN TSDB_UNI_LEN +#define TSDB_UNI_LEN 24 +#define TSDB_USER_LEN TSDB_UNI_LEN // ACCOUNT is a 32 bit positive integer // this is the length of its string representation, including the terminator zero -#define TSDB_ACCT_ID_LEN 11 - -#define TSDB_MAX_COLUMNS 4096 -#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns - -#define TSDB_NODE_NAME_LEN 64 -#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string -#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string -#define TSDB_DB_NAME_LEN 65 -#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) - -#define TSDB_FUNC_NAME_LEN 65 -#define TSDB_FUNC_COMMENT_LEN 4096 -#define TSDB_FUNC_CODE_LEN (65535 - 512) -#define TSDB_FUNC_BUF_SIZE 512 -#define TSDB_FUNC_TYPE_SCALAR 1 -#define TSDB_FUNC_TYPE_AGGREGATE 2 -#define TSDB_FUNC_MAX_RETRIEVE 1024 - -#define TSDB_TYPE_STR_MAX_LEN 32 -#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) -#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN -#define TSDB_CONSUMER_GROUP_LEN 192 -#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) -#define TSDB_COL_NAME_LEN 65 -#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 -#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE -#define TSDB_MAX_SQL_SHOW_LEN 1024 -#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb - -#define TSDB_APP_NAME_LEN TSDB_UNI_LEN -#define TSDB_STB_COMMENT_LEN 1024 - /** - * In some scenarios uint16_t (0~65535) is used to store the row len. - * - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header. - * - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus - * the final value is 65531-(4096-1)*4 = 49151. - */ -#define TSDB_MAX_BYTES_PER_ROW 49151 -#define TSDB_MAX_TAGS_LEN 16384 -#define TSDB_MAX_TAGS 128 -#define TSDB_MAX_TAG_CONDITIONS 1024 - -#define TSDB_AUTH_LEN 16 -#define TSDB_PASSWORD_LEN 32 -#define TSDB_USET_PASSWORD_LEN 129 -#define TSDB_VERSION_LEN 12 -#define TSDB_LABEL_LEN 8 - -#define TSDB_CLUSTER_ID_LEN 40 -#define TSDB_FQDN_LEN 128 -#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) -#define TSDB_IPv4ADDR_LEN 16 -#define TSDB_FILENAME_LEN 128 -#define TSDB_SHOW_SQL_LEN 512 -#define TSDB_SHOW_SUBQUERY_LEN 1000 -#define TSDB_SLOW_QUERY_SQL_LEN 512 - -#define TSDB_TRANS_STAGE_LEN 12 -#define TSDB_TRANS_TYPE_LEN 16 -#define TSDB_TRANS_ERROR_LEN 64 - -#define TSDB_STEP_NAME_LEN 32 -#define TSDB_STEP_DESC_LEN 128 - -#define TSDB_ERROR_MSG_LEN 1024 -#define TSDB_DNODE_CONFIG_LEN 128 -#define TSDB_DNODE_VALUE_LEN 256 - -#define TSDB_MQTT_HOSTNAME_LEN 64 -#define TSDB_MQTT_PORT_LEN 8 -#define TSDB_MQTT_USER_LEN 24 -#define TSDB_MQTT_PASS_LEN 24 -#define TSDB_MQTT_TOPIC_LEN 64 -#define TSDB_MQTT_CLIENT_ID_LEN 32 - -#define TSDB_DB_TYPE_DEFAULT 0 -#define TSDB_DB_TYPE_TOPIC 1 - -#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE +#define TSDB_ACCT_ID_LEN 11 + +#define TSDB_MAX_COLUMNS 4096 +#define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns + +#define TSDB_NODE_NAME_LEN 64 +#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string +#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string +#define TSDB_DB_NAME_LEN 65 +#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) + +#define TSDB_FUNC_NAME_LEN 65 +#define TSDB_FUNC_COMMENT_LEN 4096 +#define TSDB_FUNC_CODE_LEN (65535 - 512) +#define TSDB_FUNC_BUF_SIZE 512 +#define TSDB_FUNC_TYPE_SCALAR 1 +#define TSDB_FUNC_TYPE_AGGREGATE 2 +#define TSDB_FUNC_MAX_RETRIEVE 1024 + +#define TSDB_TYPE_STR_MAX_LEN 32 +#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) +#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN +#define TSDB_CONSUMER_GROUP_LEN 193 +#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) +#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20) +#define TSDB_COL_NAME_LEN 65 +#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 +#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE +#define TSDB_MAX_SQL_SHOW_LEN 1024 +#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb + +#define TSDB_APP_NAME_LEN TSDB_UNI_LEN +#define TSDB_STB_COMMENT_LEN 1024 +/** + * In some scenarios uint16_t (0~65535) is used to store the row len. + * - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header. + * - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus + * the final value is 65531-(4096-1)*4 = 49151. + */ +#define TSDB_MAX_BYTES_PER_ROW 49151 +#define TSDB_MAX_TAGS_LEN 16384 +#define TSDB_MAX_TAGS 128 +#define TSDB_MAX_TAG_CONDITIONS 1024 + +#define TSDB_AUTH_LEN 16 +#define TSDB_PASSWORD_LEN 32 +#define TSDB_USET_PASSWORD_LEN 129 +#define TSDB_VERSION_LEN 12 +#define TSDB_LABEL_LEN 8 + +#define TSDB_CLUSTER_ID_LEN 40 +#define TSDB_FQDN_LEN 128 +#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) +#define TSDB_IPv4ADDR_LEN 16 +#define TSDB_FILENAME_LEN 128 +#define TSDB_SHOW_SQL_LEN 512 +#define TSDB_SHOW_SUBQUERY_LEN 1000 +#define TSDB_SLOW_QUERY_SQL_LEN 512 + +#define TSDB_TRANS_STAGE_LEN 12 +#define TSDB_TRANS_TYPE_LEN 16 +#define TSDB_TRANS_ERROR_LEN 64 + +#define TSDB_STEP_NAME_LEN 32 +#define TSDB_STEP_DESC_LEN 128 + +#define TSDB_ERROR_MSG_LEN 1024 +#define TSDB_DNODE_CONFIG_LEN 128 +#define TSDB_DNODE_VALUE_LEN 256 + +#define TSDB_MQTT_HOSTNAME_LEN 64 +#define TSDB_MQTT_PORT_LEN 8 +#define TSDB_MQTT_USER_LEN 24 +#define TSDB_MQTT_PASS_LEN 24 +#define TSDB_MQTT_TOPIC_LEN 64 +#define TSDB_MQTT_CLIENT_ID_LEN 32 + +#define TSDB_DB_TYPE_DEFAULT 0 +#define TSDB_DB_TYPE_TOPIC 1 + +#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE -#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value -#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth +#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value +#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MIN_VNODES 16 #define TSDB_MAX_VNODES 512 #define TSDB_MIN_VNODES_PER_DB 1 #define TSDB_MAX_VNODES_PER_DB 4096 -#define TSDB_DEFAULT_VN_PER_DB 2 +#define TSDB_DEFAULT_VN_PER_DB 2 -#define TSDB_DNODE_ROLE_ANY 0 -#define TSDB_DNODE_ROLE_MGMT 1 -#define TSDB_DNODE_ROLE_VNODE 2 +#define TSDB_DNODE_ROLE_ANY 0 +#define TSDB_DNODE_ROLE_MGMT 1 +#define TSDB_DNODE_ROLE_VNODE 2 -#define TSDB_MAX_REPLICA 5 +#define TSDB_MAX_REPLICA 5 -#define TSDB_TBNAME_COLUMN_INDEX (-1) -#define TSDB_UD_COLUMN_INDEX (-1000) -#define TSDB_RES_COL_ID (-5000) +#define TSDB_TBNAME_COLUMN_INDEX (-1) +#define TSDB_UD_COLUMN_INDEX (-1000) +#define TSDB_RES_COL_ID (-5000) -#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta - -#define TSDB_MIN_CACHE_BLOCK_SIZE 1 -#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode -#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 +#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta -#define TSDB_MIN_TOTAL_BLOCKS 3 -#define TSDB_MAX_TOTAL_BLOCKS 10000 -#define TSDB_DEFAULT_TOTAL_BLOCKS 6 +#define TSDB_MIN_CACHE_BLOCK_SIZE 1 +#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode +#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 -#define TSDB_MIN_DAYS_PER_FILE 1 -#define TSDB_MAX_DAYS_PER_FILE 3650 -#define TSDB_DEFAULT_DAYS_PER_FILE 10 +#define TSDB_MIN_TOTAL_BLOCKS 3 +#define TSDB_MAX_TOTAL_BLOCKS 10000 +#define TSDB_DEFAULT_TOTAL_BLOCKS 6 -#define TSDB_MIN_KEEP 1 // data in db to be reserved. -#define TSDB_MAX_KEEP 365000 // data in db to be reserved. -#define TSDB_DEFAULT_KEEP 3650 // ten years +#define TSDB_MIN_DAYS_PER_FILE 1 +#define TSDB_MAX_DAYS_PER_FILE 3650 +#define TSDB_DEFAULT_DAYS_PER_FILE 10 -#define TSDB_MIN_MIN_ROW_FBLOCK 10 -#define TSDB_MAX_MIN_ROW_FBLOCK 1000 -#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100 +#define TSDB_MIN_KEEP 1 // data in db to be reserved. +#define TSDB_MAX_KEEP 365000 // data in db to be reserved. +#define TSDB_DEFAULT_KEEP 3650 // ten years -#define TSDB_MIN_MAX_ROW_FBLOCK 200 -#define TSDB_MAX_MAX_ROW_FBLOCK 10000 -#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096 +#define TSDB_MIN_MIN_ROW_FBLOCK 10 +#define TSDB_MAX_MIN_ROW_FBLOCK 1000 +#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100 -#define TSDB_MIN_COMMIT_TIME 30 -#define TSDB_MAX_COMMIT_TIME 40960 -#define TSDB_DEFAULT_COMMIT_TIME 3600 +#define TSDB_MIN_MAX_ROW_FBLOCK 200 +#define TSDB_MAX_MAX_ROW_FBLOCK 10000 +#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096 -#define TSDB_MIN_FSYNC_PERIOD 0 -#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond -#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second +#define TSDB_MIN_COMMIT_TIME 30 +#define TSDB_MAX_COMMIT_TIME 40960 +#define TSDB_DEFAULT_COMMIT_TIME 3600 -#define TSDB_MIN_WAL_LEVEL 0 -#define TSDB_MAX_WAL_LEVEL 2 -#define TSDB_DEFAULT_WAL_LEVEL 1 +#define TSDB_MIN_FSYNC_PERIOD 0 +#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond +#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second -#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI -#define TSDB_MAX_PRECISION TSDB_TIME_PRECISION_NANO -#define TSDB_DEFAULT_PRECISION TSDB_TIME_PRECISION_MILLI +#define TSDB_MIN_WAL_LEVEL 0 +#define TSDB_MAX_WAL_LEVEL 2 +#define TSDB_DEFAULT_WAL_LEVEL 1 -#define TSDB_MIN_COMP_LEVEL 0 -#define TSDB_MAX_COMP_LEVEL 2 -#define TSDB_DEFAULT_COMP_LEVEL 2 +#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI +#define TSDB_MAX_PRECISION TSDB_TIME_PRECISION_NANO +#define TSDB_DEFAULT_PRECISION TSDB_TIME_PRECISION_MILLI -#define TSDB_MIN_DB_REPLICA_OPTION 1 -#define TSDB_MAX_DB_REPLICA_OPTION 3 -#define TSDB_DEFAULT_DB_REPLICA_OPTION 1 +#define TSDB_MIN_COMP_LEVEL 0 +#define TSDB_MAX_COMP_LEVEL 2 +#define TSDB_DEFAULT_COMP_LEVEL 2 -#define TSDB_MIN_DB_QUORUM_OPTION 1 -#define TSDB_MAX_DB_QUORUM_OPTION 2 -#define TSDB_DEFAULT_DB_QUORUM_OPTION 1 +#define TSDB_MIN_DB_REPLICA_OPTION 1 +#define TSDB_MAX_DB_REPLICA_OPTION 3 +#define TSDB_DEFAULT_DB_REPLICA_OPTION 1 -#define TSDB_MIN_DB_UPDATE 0 -#define TSDB_MAX_DB_UPDATE 2 -#define TSDB_DEFAULT_DB_UPDATE_OPTION 0 +#define TSDB_MIN_DB_QUORUM_OPTION 1 +#define TSDB_MAX_DB_QUORUM_OPTION 2 +#define TSDB_DEFAULT_DB_QUORUM_OPTION 1 -#define TSDB_MIN_DB_CACHE_LAST_ROW 0 -#define TSDB_MAX_DB_CACHE_LAST_ROW 3 -#define TSDB_DEFAULT_CACHE_LAST_ROW 0 +#define TSDB_MIN_DB_UPDATE 0 +#define TSDB_MAX_DB_UPDATE 2 +#define TSDB_DEFAULT_DB_UPDATE_OPTION 0 -#define TSDB_MAX_JOIN_TABLE_NUM 10 -#define TSDB_MAX_UNION_CLAUSE 5 +#define TSDB_MIN_DB_CACHE_LAST_ROW 0 +#define TSDB_MAX_DB_CACHE_LAST_ROW 3 +#define TSDB_DEFAULT_CACHE_LAST_ROW 0 -#define TSDB_MAX_FIELD_LEN 16384 -#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384 -#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384 -#define PRIMARYKEY_TIMESTAMP_COL_ID 1 -#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId)) +#define TSDB_MAX_JOIN_TABLE_NUM 10 +#define TSDB_MAX_UNION_CLAUSE 5 -#define TSDB_MAX_RPC_THREADS 5 +#define TSDB_MAX_FIELD_LEN 16384 +#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384 +#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384 +#define PRIMARYKEY_TIMESTAMP_COL_ID 1 +#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId)) -#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type -#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode +#define TSDB_MAX_RPC_THREADS 5 #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode -#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default - - +#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default /* * 1. ordinary sub query for select * from super_table * 2. all sqlobj generated by createSubqueryObj with this flag */ -#define TSDB_QUERY_TYPE_SUBQUERY 0x02u -#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table - -#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side -#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table -#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query -#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query -#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage - -#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u -#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type -#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u -#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file -#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type -#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query - -#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) -#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) -#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type)) -#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE) - -#define TSDB_ORDER_ASC 1 -#define TSDB_ORDER_DESC 2 - -#define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1 -#define TSDB_DEFAULT_MNODES_HASH_SIZE 5 -#define TSDB_DEFAULT_DNODES_HASH_SIZE 10 -#define TSDB_DEFAULT_ACCOUNTS_HASH_SIZE 10 -#define TSDB_DEFAULT_USERS_HASH_SIZE 20 -#define TSDB_DEFAULT_DBS_HASH_SIZE 100 -#define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100 -#define TSDB_DEFAULT_STABLES_HASH_SIZE 100 -#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 - -#define TSDB_MAX_WAL_SIZE (1024*1024*3) - -#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P - -#define TFS_MAX_TIERS 3 +#define TSDB_QUERY_TYPE_SUBQUERY 0x02u +#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table + +#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side +#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table +#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query +#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query +#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage + +#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u +#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type +#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u +#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file +#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type +#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query + +#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) +#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) +#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type)) +#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE) + +#define TSDB_ORDER_ASC 1 +#define TSDB_ORDER_DESC 2 + +#define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1 +#define TSDB_DEFAULT_MNODES_HASH_SIZE 5 +#define TSDB_DEFAULT_DNODES_HASH_SIZE 10 +#define TSDB_DEFAULT_ACCOUNTS_HASH_SIZE 10 +#define TSDB_DEFAULT_USERS_HASH_SIZE 20 +#define TSDB_DEFAULT_DBS_HASH_SIZE 100 +#define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100 +#define TSDB_DEFAULT_STABLES_HASH_SIZE 100 +#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 + +#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3) + +#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P + +#define TFS_MAX_TIERS 3 #define TFS_MAX_DISKS_PER_TIER 16 -#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER) -#define TFS_MIN_LEVEL 0 -#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1) -#define TFS_PRIMARY_LEVEL 0 -#define TFS_PRIMARY_ID 0 +#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER) +#define TFS_MIN_LEVEL 0 +#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1) +#define TFS_PRIMARY_LEVEL 0 +#define TFS_PRIMARY_ID 0 #define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED }; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 229d3a9ec319472f3ead8f0a1a984d5a800ea842..e206b1a2bb9a953c9d87b9a65ef15662f85fc71d 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -45,22 +45,24 @@ struct tmq_topic_vgroup_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[256]; - bool auto_commit; + char clientId[256]; + char groupId[256]; + int8_t auto_commit; + int8_t resetOffset; + tmq_commit_cb* commit_cb; /*char* ip;*/ /*uint16_t port;*/ - tmq_commit_cb* commit_cb; }; struct tmq_t { // conf char groupId[256]; char clientId[256]; - bool autoCommit; + int8_t autoCommit; SRWLatch lock; int64_t consumerId; int32_t epoch; + int32_t resetOffsetCfg; int64_t status; tsem_t rspSem; STscObj* pTscObj; @@ -79,7 +81,6 @@ typedef struct { // statistics int64_t pollCnt; // offset - int64_t committedOffset; int64_t currentOffset; // connection info int32_t vgId; @@ -115,21 +116,17 @@ typedef struct { } SMqConsumeCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - int32_t async; - tsem_t rspSem; -} SMqCommitCbParam; - -typedef struct { - tmq_t* tmq; + tmq_t* tmq; + /*SMqClientVg* pVg;*/ + int32_t async; tsem_t rspSem; tmq_resp_err_t rspErr; -} SMqResetOffsetParam; +} SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; + conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; } @@ -157,6 +154,20 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_INVALID; } } + if (strcmp(key, "auto.offset.reset") == 0) { + if (strcmp(value, "none") == 0) { + conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE; + return TMQ_CONF_OK; + } else if (strcmp(value, "earliest") == 0) { + conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; + return TMQ_CONF_OK; + } else if (strcmp(value, "latest") == 0) { + conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; + return TMQ_CONF_OK; + } else { + return TMQ_CONF_INVALID; + } + } return TMQ_CONF_UNKNOWN; } @@ -190,14 +201,12 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { if (pParam->tmq->commit_cb) { pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); } - if (!pParam->async) tsem_post(&pParam->rspSem); - return 0; -} - -int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param; - pParam->rspErr = code; - tsem_post(&pParam->rspSem); + if (!pParam->async) + tsem_post(&pParam->rspSem); + else { + tsem_destroy(&pParam->rspSem); + free(param); + } return 0; } @@ -216,6 +225,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs strcpy(pTmq->groupId, conf->groupId); pTmq->autoCommit = conf->auto_commit; pTmq->commit_cb = conf->commit_cb; + pTmq->resetOffsetCfg = conf->resetOffset; tsem_init(&pTmq->rspSem, 0, 0); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); @@ -223,18 +233,40 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } -tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { - SRequestObj* pRequest = NULL; +tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { + // TODO: add read write lock + SRequestObj* pRequest = NULL; + tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS; // build msg // send to mnode - SMqCMResetOffsetReq req; - req.num = offsets->cnt; - req.offsets = (SMqOffset*)offsets->elems; + SMqCMCommitOffsetReq req; + SArray* pArray = NULL; + + if (offsets == NULL) { + pArray = taosArrayInit(0, sizeof(SMqOffset)); + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + SMqOffset offset; + strcpy(offset.topicName, pTopic->topicName); + strcpy(offset.cgroup, tmq->groupId); + offset.vgId = pVg->vgId; + offset.offset = pVg->currentOffset; + taosArrayPush(pArray, &offset); + } + } + req.num = pArray->size; + req.offsets = pArray->pData; + } else { + req.num = offsets->cnt; + req.offsets = (SMqOffset*)offsets->elems; + } SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - tEncodeSMqCMResetOffsetReq(&encoder, &req); + tEncodeSMqCMCommitOffsetReq(&encoder, &req); int32_t tlen = encoder.pos; void* buf = malloc(tlen); if (buf == NULL) { @@ -244,32 +276,41 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse tCoderClear(&encoder); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); - tEncodeSMqCMResetOffsetReq(&encoder, &req); + tEncodeSMqCMCommitOffsetReq(&encoder, &req); tCoderClear(&encoder); - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET); + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET); if (pRequest == NULL) { tscError("failed to malloc request"); } - SMqResetOffsetParam param = {0}; - tsem_init(¶m.rspSem, 0, 0); - param.tmq = tmq; + SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam)); + if (pParam == NULL) { + return -1; + } + pParam->tmq = tmq; + tsem_init(&pParam->rspSem, 0, 0); pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->param = ¶m; - sendInfo->fp = tmqResetOffsetCb; + sendInfo->param = pParam; + sendInfo->fp = tmqCommitCb; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(¶m.rspSem); - tsem_destroy(¶m.rspSem); + if (!async) { + tsem_wait(&pParam->rspSem); + resp = pParam->rspErr; + } - return param.rspErr; + if (pArray) { + taosArrayDestroy(pArray); + } + + return resp; } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { @@ -641,8 +682,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { // clang-format off SMqClientVg clientVg = { .pollCnt = 0, - .committedOffset = -1, - .currentOffset = -1, + .currentOffset = pVgEp->offset, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet }; @@ -708,23 +748,51 @@ END: return 0; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, - SMqClientVg* pVg) { +tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { + const SMqOffset* pOffset = &offset->offset; + if (strcmp(pOffset->cgroup, tmq->groupId) != 0) { + return TMQ_RESP_ERR__FAIL; + } + int32_t sz = taosArrayGetSize(tmq->clientTopics); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i); + if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) { + int32_t vgSz = taosArrayGetSize(clientTopic->vgs); + for (int32_t j = 0; j < vgSz; j++) { + SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j); + if (pVg->vgId == pOffset->vgId) { + pVg->currentOffset = pOffset->offset; + return TMQ_RESP_ERR__SUCCESS; + } + } + } + } + return TMQ_RESP_ERR__FAIL; +} + +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) { + int64_t reqOffset; + if (pVg->currentOffset >= 0) { + reqOffset = pVg->currentOffset; + } else { + if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) { + tscError("unable to poll since no committed offset but reset offset is set to none"); + return NULL; + } + reqOffset = tmq->resetOffsetCfg; + } + SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); if (pReq == NULL) { return NULL; } - pReq->reqType = type; + strcpy(pReq->topic, pTopic->topicName); - pReq->blockingTime = blocking_time; - pReq->consumerId = tmq->consumerId; strcpy(pReq->cgroup, tmq->groupId); - if (type == TMQ_REQ_TYPE_COMMIT_ONLY) { - pReq->offset = pVg->currentOffset; - } else { - pReq->offset = pVg->currentOffset + 1; - } + pReq->blockingTime = blocking_time; + pReq->consumerId = tmq->consumerId; + pReq->currentOffset = reqOffset; pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); @@ -743,13 +811,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); - printf("over1\n"); + /*printf("over1\n");*/ usleep(blocking_time * 1000); return NULL; } SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(pTopic->vgs) == 0) { - printf("over2\n"); + /*printf("over2\n");*/ usleep(blocking_time * 1000); return NULL; } @@ -760,8 +828,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ - int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY; - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg); + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg); if (pReq == NULL) { ASSERT(false); usleep(blocking_time * 1000); @@ -821,6 +888,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } +#if 0 tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { if (tmq_topic_vgroup_list != NULL) { // TODO @@ -831,7 +899,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg); + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; @@ -858,6 +926,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v return 0; } +#endif void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 46feab779144f89272f977f58a7ae463d8063250..f07cc0b0b517cc61b49db3d5f48b9ee9f8f44e40 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -285,7 +285,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->keep); @@ -330,7 +330,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { } void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { - buf = taosDecodeFixedU64(buf, &(pReq->ver)); + buf = taosDecodeFixedI64(buf, &(pReq->ver)); buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->keep)); @@ -380,7 +380,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray)); for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i); @@ -393,7 +393,7 @@ int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { uint32_t nsize = 0; - buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedI64(buf, &pReq->ver); buf = taosDecodeFixedU32(buf, &nsize); pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq)); for (size_t i = 0; i < nsize; i++) { @@ -407,14 +407,14 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) { int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU8(buf, pReq->type); return tlen; } void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { - buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedI64(buf, &pReq->ver); buf = taosDecodeString(buf, &pReq->name); buf = taosDecodeFixedU8(buf, &pReq->type); return buf; @@ -1393,7 +1393,7 @@ int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->db) < 0) return -1; - if (tEncodeU64(&encoder, pRsp->uid) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->uid) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1407,7 +1407,7 @@ int32_t tDeserializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->db) < 0) return -1; - if (tDecodeU64(&decoder, &pRsp->uid) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->uid) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); @@ -1468,7 +1468,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1; @@ -1518,7 +1518,7 @@ int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->uid) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1; @@ -1661,7 +1661,7 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->tbName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->stbName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->dbId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->numOfTags) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->numOfColumns) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1; @@ -1669,8 +1669,8 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) { if (tEncodeI8(pEncoder, pRsp->update) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->sversion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->tversion) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->suid) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->tuid) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->suid) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->tuid) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1; for (int32_t i = 0; i < pRsp->numOfColumns + pRsp->numOfTags; ++i) { SSchema *pSchema = &pRsp->pSchemas[i]; @@ -1684,7 +1684,7 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) { if (tDecodeCStrTo(pDecoder, pRsp->tbName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->stbName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->dbId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->numOfTags) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->numOfColumns) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1; @@ -1692,8 +1692,8 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) { if (tDecodeI8(pDecoder, &pRsp->update) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->sversion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->tversion) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->suid) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->tuid) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->suid) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->tuid) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns; @@ -2093,7 +2093,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; - if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheBlockSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1; @@ -2133,7 +2133,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; - if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheBlockSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1; @@ -2171,7 +2171,7 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; - if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; tEndEncode(&encoder); @@ -2187,7 +2187,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; - if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; tEndDecode(&decoder); @@ -2356,35 +2356,7 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) { return 0; } -int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) { - if (tStartEncode(encoder) < 0) return -1; - if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1; - int32_t sz = taosArrayGetSize(pOffsets->offsets); - if (tEncodeI32(encoder, sz) < 0) return -1; - for (int32_t i = 0; i < sz; i++) { - SMqOffset *offset = taosArrayGet(pOffsets->offsets, i); - if (tEncodeSMqOffset(encoder, offset) < 0) return -1; - } - tEndEncode(encoder); - return encoder->pos; -} - -int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) { - int32_t sz; - if (tStartDecode(decoder) < 0) return -1; - if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1; - if (tDecodeI32(decoder, &sz) < 0) return -1; - pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset)); - for (int32_t i = 0; i < sz; i++) { - SMqOffset offset; - if (tDecodeSMqOffset(decoder, &offset) < 0) return -1; - taosArrayPush(pOffsets->offsets, &offset); - } - tEndDecode(decoder); - return 0; -} - -int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) { +int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq *pReq) { if (tStartEncode(encoder) < 0) return -1; if (tEncodeI32(encoder, pReq->num) < 0) return -1; for (int32_t i = 0; i < pReq->num; i++) { @@ -2394,7 +2366,7 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p return encoder->pos; } -int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { +int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { if (tStartDecode(decoder) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1; pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); @@ -2405,23 +2377,3 @@ int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { tEndDecode(decoder); return 0; } - -#if 0 -int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) { - if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tEncodeSMqOffset(encoder, &pReq->offsets[i]); - } - return encoder->pos; -} - -int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) { - if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); - if (pReq->offsets == NULL) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tDecodeSMqOffset(decoder, &pReq->offsets[i]); - } - return 0; -} -#endif diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 4e35baf9052bdd11fffee289e59f00255bf96634..0aae145d2f3c9ab67c3546bb7cfa3dab49a141b6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -25,8 +25,8 @@ #include "dndMnode.h" #include "dndVnodes.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dndInitMsgFp(STransMgmt *pMgmt) { @@ -113,6 +113,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; @@ -155,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode * pDnode = parent; + SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -219,7 +220,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode * pDnode = param; + SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -313,7 +314,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e64191f7153c87301fa14d74e44cdb75b5f2cd89..bb2367ef728398ae049448afb42f544dccf71c90 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -123,6 +123,7 @@ typedef enum { TRN_TYPE_DROP_TOPIC = 1015, TRN_TYPE_SUBSCRIBE = 1016, TRN_TYPE_REBALANCE = 1017, + TRN_TYPE_COMMIT_OFFSET = 1018, TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_CREATE_DNODE = 2001, @@ -176,7 +177,7 @@ typedef struct { SArray* undoActions; int64_t createdTime; int64_t lastExecTime; - uint64_t dbUid; + int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; } STrans; @@ -304,16 +305,16 @@ typedef struct { } SDbCfg; typedef struct { - char name[TSDB_DB_FNAME_LEN]; - char acct[TSDB_USER_LEN]; - char createUser[TSDB_USER_LEN]; - int64_t createdTime; - int64_t updateTime; - uint64_t uid; - int32_t cfgVersion; - int32_t vgVersion; - int8_t hashMethod; // default is 1 - SDbCfg cfg; + char name[TSDB_DB_FNAME_LEN]; + char acct[TSDB_USER_LEN]; + char createUser[TSDB_USER_LEN]; + int64_t createdTime; + int64_t updateTime; + int64_t uid; + int32_t cfgVersion; + int32_t vgVersion; + int8_t hashMethod; // default is 1 + SDbCfg cfg; } SDbObj; typedef struct { @@ -346,8 +347,8 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t createdTime; int64_t updateTime; - uint64_t uid; - uint64_t dbUid; + int64_t uid; + int64_t dbUid; int32_t version; int32_t nextColId; int32_t numOfColumns; @@ -465,6 +466,24 @@ static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) { } } +typedef struct { + char key[TSDB_PARTITION_KEY_LEN]; + int64_t offset; +} SMqOffsetObj; + +static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pOffset->key); + tlen += taosEncodeFixedI64(buf, pOffset->offset); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) { + buf = taosDecodeStringTo(buf, pOffset->key); + buf = taosDecodeFixedI64(buf, &pOffset->offset); + return buf; +} + typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; int32_t status; diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h new file mode 100644 index 0000000000000000000000000000000000000000..e18eec973f87129570983ad4724df115c6e18276 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_OFFSET_H_ +#define _TD_MND_OFFSET_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitOffset(SMnode *pMnode); +void mndCleanupOffset(SMnode *pMnode); + +SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key); +void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset); + +SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset); +SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw); + +int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs); + +static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) { + return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); +} + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_OFFSET_H_*/ diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c new file mode 100644 index 0000000000000000000000000000000000000000..ac9e99ebd43769559c47868275d84da2c4066001 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndOffset.h" +#include "mndAuth.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define MND_OFFSET_VER_NUMBER 1 +#define MND_OFFSET_RESERVE_SIZE 64 + +static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset); +static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset); +static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset); +static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pReq); + +int32_t mndInitOffset(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_OFFSET, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndOffsetActionEncode, + .decodeFp = (SdbDecodeFp)mndOffsetActionDecode, + .insertFp = (SdbInsertFp)mndOffsetActionInsert, + .updateFp = (SdbUpdateFp)mndOffsetActionUpdate, + .deleteFp = (SdbDeleteFp)mndOffsetActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_MQ_COMMIT_OFFSET, mndProcessCommitOffsetReq); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupOffset(SMnode *pMnode) {} + +SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + int32_t tlen = tEncodeSMqOffsetObj(NULL, pOffset); + int32_t size = sizeof(int32_t) + tlen + MND_OFFSET_RESERVE_SIZE; + + SSdbRaw *pRaw = sdbAllocRaw(SDB_OFFSET, MND_OFFSET_VER_NUMBER, size); + if (pRaw == NULL) goto OFFSET_ENCODE_OVER; + + buf = malloc(tlen); + if (buf == NULL) goto OFFSET_ENCODE_OVER; + + void *abuf = buf; + tEncodeSMqOffsetObj(&abuf, pOffset); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, OFFSET_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_ENCODE_OVER); + SDB_SET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, OFFSET_ENCODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +OFFSET_ENCODE_OVER: + tfree(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("offset:%s, failed to encode to raw:%p since %s", pOffset->key, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("offset:%s, encode to raw:%p, row:%p", pOffset->key, pRaw, pOffset); + return pRaw; +} + +SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto OFFSET_DECODE_OVER; + + if (sver != MND_OFFSET_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto OFFSET_DECODE_OVER; + } + + int32_t size = sizeof(SMqOffsetObj); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto OFFSET_DECODE_OVER; + + SMqOffsetObj *pOffset = sdbGetRowObj(pRow); + if (pOffset == NULL) goto OFFSET_DECODE_OVER; + + int32_t dataPos = 0; + int32_t tlen; + SDB_GET_INT32(pRaw, dataPos, &tlen, OFFSET_DECODE_OVER); + buf = malloc(tlen + 1); + if (buf == NULL) goto OFFSET_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_DECODE_OVER); + + if (tDecodeSMqOffsetObj(buf, pOffset) == NULL) { + goto OFFSET_DECODE_OVER; + } + + terrno = TSDB_CODE_SUCCESS; + +OFFSET_DECODE_OVER: + tfree(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("offset:%s, failed to decode from raw:%p since %s", pOffset->key, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("offset:%s, decode from raw:%p, row:%p", pOffset->key, pRaw, pOffset); + return pRow; +} + +int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) { + int32_t code = 0; + int32_t sz = taosArrayGetSize(vgs); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i); + SMqOffsetObj offsetObj; + if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, pConsumerEp->vgId) < 0) { + return -1; + } + offsetObj.offset = -1; + SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj); + if (pOffsetRaw == NULL) { + return -1; + } + sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY); + if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) { + return -1; + } + } + return 0; +} + +static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pMsg) { + char key[TSDB_PARTITION_KEY_LEN]; + + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SMqCMCommitOffsetReq commitOffsetReq; + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER); + + tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, &pMsg->rpcMsg); + + for (int32_t i = 0; i < commitOffsetReq.num; i++) { + SMqOffset *pOffset = &commitOffsetReq.offsets[i]; + if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) { + return -1; + } + SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); + ASSERT(pOffsetObj); + pOffsetObj->offset = pOffset->offset; + SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj); + sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pOffsetRaw); + mndReleaseOffset(pMnode, pOffsetObj); + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) { + mTrace("offset:%s, perform insert action", pOffset->key); + return 0; +} + +static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) { + mTrace("offset:%s, perform delete action", pOffset->key); + return 0; +} + +static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) { + mTrace("offset:%s, perform update action", pOldOffset->key); + return 0; +} + +SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key) { + SSdb *pSdb = pMnode->pSdb; + SMqOffsetObj *pOffset = sdbAcquire(pSdb, SDB_OFFSET, key); + if (pOffset == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_OFFSET_NOT_EXIST; + } + return pOffset; +} + +void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pOffset); +} + +static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a5c4c41244e32d79d99b6072601267f4e923e4bb..2ea157fea4407259ba86a1060c7a3dc409e45b93 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -19,6 +19,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndOffset.h" #include "mndShow.h" #include "mndStb.h" #include "mndTopic.h" @@ -80,13 +81,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) { +static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) { SMqSubscribeObj *pSub = tNewSubscribeObj(); if (pSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name); + char *key = mndMakeSubscribeKey(cgroup, pTopic->name); if (key == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tDeleteSMqSubscribeObj(pSub); @@ -289,9 +290,15 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); for (int32_t k = 0; k < vgsz; k++) { + char offsetKey[TSDB_PARTITION_KEY_LEN]; SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); - - SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; + SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1}; + mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId); + SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey); + if (pOffsetObj != NULL) { + vgEp.offset = pOffsetObj->offset; + mndReleaseOffset(pMnode, pOffsetObj); + } taosArrayPush(topicEp.vgs, &vgEp); } taosArrayPush(rsp.topics, &topicEp); @@ -870,7 +877,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { SUB_ENCODE_OVER: tfree(buf); - if (terrno != 0) { + if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); sdbFreeRaw(pRaw); return NULL; @@ -1085,6 +1092,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName); pSub = mndCreateSubscription(pMnode, pTopic, cgroup); createSub = true; + + mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg); } SMqSubConsumer mqSubConsumer; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 64b4aa6dd702d6477a710b42bf3e9c82e1937d8c..5b5bf661e54940a65550e4661678efe966ebd126 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -23,6 +23,7 @@ #include "mndDnode.h" #include "mndFunc.h" #include "mndMnode.h" +#include "mndOffset.h" #include "mndProfile.h" #include "mndQnode.h" #include "mndShow.h" @@ -77,7 +78,7 @@ static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } @@ -89,7 +90,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } @@ -197,6 +198,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; @@ -440,7 +442,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; - void * ahandle = pMsg->rpcMsg.ahandle; + void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType & 1U); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4dc46b37984ba4b5976ec45130fa7725b001a369..ac9dde35974227538b4c8aee7ba5d15ecfeee9bb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -207,9 +207,17 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; - int64_t fetchOffset = pReq->offset; + int64_t fetchOffset; /*int64_t blockingTime = pReq->blockingTime;*/ + if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { + fetchOffset = 0; + } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { + fetchOffset = walGetLastVer(pTq->pWal); + } else { + fetchOffset = pReq->currentOffset + 1; + } + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); @@ -226,31 +234,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); ASSERT(pConsumer->consumerId == consumerId); - if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { - pTopic->committedOffset = pReq->offset; - /*printf("offset %ld committed\n", pTopic->committedOffset);*/ - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; - } - - if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - if (pTopic->committedOffset < pReq->offset - 1) { - pTopic->committedOffset = pReq->offset - 1; - /*printf("offset %ld committed\n", pTopic->committedOffset);*/ - } - } - - rsp.committedOffset = pTopic->committedOffset; - rsp.reqOffset = pReq->offset; + rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; - if (fetchOffset <= pTopic->committedOffset) { - fetchOffset = pTopic->committedOffset + 1; - } - SWalHead* pHead; while (1) { int8_t pos = fetchOffset % TQ_BUFFER_SIZE; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index ade280eecc85f9346331bf5a91a79c8afbdf1198..c3947da4591de26b9a4efbc6cb5b23204e8fbb49 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -22,14 +22,15 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); - // ser request version + // set request version void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int64_t ver = pVnode->state.processed++; taosEncodeFixedI64(&pBuf, ver); if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { - /*ASSERT(false);*/ // TODO: handle error + /*ASSERT(false);*/ + vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 18485249b39f47faf19e25d8079016af2acd213d..4078ee9291963724ee69989021552c089239bf01 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -35,7 +35,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) } else { SStreamBlockScanInfo* pInfo = pOperator->info; if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { - qError("submit msg error while set stream msg, %s" PRIx64, id); + qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } return TSDB_CODE_SUCCESS;