提交 48283e42 编写于 作者: L Liu Jicong

new offset management

上级 69fecab7
...@@ -66,7 +66,7 @@ int32_t init_env() { ...@@ -66,7 +66,7 @@ int32_t init_env() {
} }
int32_t create_topic() { int32_t create_topic() {
printf("create topic"); printf("create topic\n");
TAOS_RES* pRes; TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
...@@ -91,6 +91,10 @@ int32_t create_topic() { ...@@ -91,6 +91,10 @@ int32_t create_topic() {
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d\n", resp);
}
tmq_t* build_consumer() { tmq_t* build_consumer() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -103,6 +107,7 @@ tmq_t* build_consumer() { ...@@ -103,6 +107,7 @@ tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2"); tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
return tmq; return tmq;
} }
...@@ -144,7 +149,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -144,7 +149,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000; static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0; int msg_count = 0;
tmq_resp_err_t err; tmq_resp_err_t err;
...@@ -214,6 +219,6 @@ int main(int argc, char* argv[]) { ...@@ -214,6 +219,6 @@ int main(int argc, char* argv[]) {
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/ /*perf_loop(tmq, topic_list);*/
basic_consume_loop(tmq, topic_list); /*basic_consume_loop(tmq, topic_list);*/
/*sync_consume_loop(tmq, topic_list);*/ sync_consume_loop(tmq, topic_list);
} }
...@@ -198,8 +198,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi ...@@ -198,8 +198,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
/* --------------------------TMQ INTERFACE------------------------------- */ /* --------------------------TMQ INTERFACE------------------------------- */
enum tmq_resp_err_t { enum tmq_resp_err_t {
TMQ_RESP_ERR__FAIL = -1,
TMQ_RESP_ERR__SUCCESS = 0, TMQ_RESP_ERR__SUCCESS = 0,
TMQ_RESP_ERR__FAIL = 1,
}; };
typedef enum tmq_resp_err_t tmq_resp_err_t; typedef enum tmq_resp_err_t tmq_resp_err_t;
...@@ -226,7 +226,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); ...@@ -226,7 +226,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
#if 0 #if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics);
#endif #endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
...@@ -238,6 +238,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t * ...@@ -238,6 +238,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
#if 0 #if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif #endif
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t { enum tmq_conf_res_t {
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#ifndef TDENGINE_COMMON_H #ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H #define TDENGINE_COMMON_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -43,14 +42,16 @@ extern "C" { ...@@ -43,14 +42,16 @@ extern "C" {
// int16_t bytes; // int16_t bytes;
// } SSchema; // } SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0 enum {
#define TMQ_REQ_TYPE_CONSUME_ONLY 1 TMQ_CONF__RESET_OFFSET__LATEST = -1,
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2 TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
TMQ_CONF__RESET_OFFSET__NONE = -3,
};
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray *pGroupList; SArray* pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid SHashObj* map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo; } STableGroupInfo;
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
...@@ -79,14 +80,14 @@ typedef struct SConstantItem { ...@@ -79,14 +80,14 @@ typedef struct SConstantItem {
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg; SColumnDataAgg* pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData> SArray* pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value. SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
typedef struct SVarColAttr { typedef struct SVarColAttr {
int32_t *offset; // start position for each entry in the list int32_t* offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data uint32_t length; // used buffer size that contain the valid data
uint32_t allocLen; // allocated buffer size uint32_t allocLen; // allocated buffer size
} SVarColAttr; } SVarColAttr;
...@@ -94,11 +95,11 @@ typedef struct SVarColAttr { ...@@ -94,11 +95,11 @@ typedef struct SVarColAttr {
// pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null. // pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData { typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed SColumnInfo info; // TODO filter info needs to be removed
bool hasNull;// if current column data has null value. bool hasNull; // if current column data has null value.
char *pData; // the corresponding block data in memory char* pData; // the corresponding block data in memory
union { union {
char *nullbitmap; // bitmap, one bit for each item in the list char* nullbitmap; // bitmap, one bit for each item in the list
SVarColAttr varmeta; SVarColAttr varmeta;
}; };
} SColumnInfoData; } SColumnInfoData;
...@@ -149,7 +150,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp ...@@ -149,7 +150,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
int32_t tlen = 0; int32_t tlen = 0;
int32_t sz = 0; int32_t sz = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId); tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->committedOffset);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
...@@ -170,7 +170,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp ...@@ -170,7 +170,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
int32_t sz; int32_t sz;
buf = taosDecodeFixedI64(buf, &pRsp->consumerId); buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->committedOffset);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
...@@ -250,11 +249,11 @@ typedef struct SSqlExpr { ...@@ -250,11 +249,11 @@ typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema; SSchema resSchema;
int32_t numOfCols; int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3 SVariant param[3]; // parameters are not more than 3
} SSqlExpr; } SSqlExpr;
typedef struct SExprInfo { typedef struct SExprInfo {
...@@ -271,7 +270,7 @@ typedef struct SSessionWindow { ...@@ -271,7 +270,7 @@ typedef struct SSessionWindow {
SColumn col; SColumn col;
} SSessionWindow; } SSessionWindow;
#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1 #define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
......
...@@ -422,8 +422,8 @@ typedef struct { ...@@ -422,8 +422,8 @@ typedef struct {
} SColumnInfo; } SColumnInfo;
typedef struct { typedef struct {
uint64_t uid; int64_t uid;
TSKEY key; // last accessed ts, for subscription TSKEY key; // last accessed ts, for subscription
} STableIdInfo; } STableIdInfo;
typedef struct STimeWindow { typedef struct STimeWindow {
...@@ -554,8 +554,8 @@ int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); ...@@ -554,8 +554,8 @@ int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
uint64_t uid; int64_t uid;
} SDropDbRsp; } SDropDbRsp;
int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
...@@ -570,12 +570,12 @@ int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); ...@@ -570,12 +570,12 @@ int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
uint64_t uid; int64_t uid;
int32_t vgVersion; int32_t vgVersion;
int32_t vgNum; int32_t vgNum;
int8_t hashMethod; int8_t hashMethod;
SArray* pVgroupInfos; // Array of SVgroupInfo SArray* pVgroupInfos; // Array of SVgroupInfo
} SUseDbRsp; } SUseDbRsp;
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
...@@ -725,7 +725,7 @@ typedef struct { ...@@ -725,7 +725,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
int32_t dnodeId; int32_t dnodeId;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
uint64_t dbUid; int64_t dbUid;
int32_t vgVersion; int32_t vgVersion;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
...@@ -753,10 +753,10 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR ...@@ -753,10 +753,10 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t dnodeId; int32_t dnodeId;
uint64_t dbUid; int64_t dbUid;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; } SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
...@@ -796,7 +796,7 @@ typedef struct { ...@@ -796,7 +796,7 @@ typedef struct {
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId; int64_t dbId;
int32_t numOfTags; int32_t numOfTags;
int32_t numOfColumns; int32_t numOfColumns;
int8_t precision; int8_t precision;
...@@ -804,8 +804,8 @@ typedef struct { ...@@ -804,8 +804,8 @@ typedef struct {
int8_t update; int8_t update;
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
uint64_t suid; int64_t suid;
uint64_t tuid; int64_t tuid;
int32_t vgId; int32_t vgId;
SSchema* pSchemas; SSchema* pSchemas;
} STableMetaRsp; } STableMetaRsp;
...@@ -1268,7 +1268,7 @@ typedef struct { ...@@ -1268,7 +1268,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid; int64_t tuid;
int32_t sverson; int32_t sverson;
int32_t execLen; int32_t execLen;
char* executor; char* executor;
...@@ -1279,11 +1279,11 @@ typedef struct { ...@@ -1279,11 +1279,11 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid; int64_t tuid;
} SDDropTopicReq; } SDDropTopicReq;
typedef struct SVCreateTbReq { typedef struct SVCreateTbReq {
uint64_t ver; // use a general definition int64_t ver; // use a general definition
char* name; char* name;
uint32_t ttl; uint32_t ttl;
uint32_t keep; uint32_t keep;
...@@ -1314,8 +1314,8 @@ int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); ...@@ -1314,8 +1314,8 @@ int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
typedef struct { typedef struct {
uint64_t ver; // use a general definition int64_t ver; // use a general definition
SArray* pArray; SArray* pArray;
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
typedef struct { typedef struct {
...@@ -1325,7 +1325,7 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); ...@@ -1325,7 +1325,7 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
typedef struct { typedef struct {
uint64_t ver; int64_t ver;
char* name; char* name;
uint8_t type; uint8_t type;
tb_uid_t suid; tb_uid_t suid;
...@@ -1760,35 +1760,19 @@ typedef struct { ...@@ -1760,35 +1760,19 @@ typedef struct {
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqOffset; } SMqOffset;
typedef struct {
int32_t vgId;
SArray* offsets; // SArray<SMqOffset>
} SMqVgOffsets;
typedef struct { typedef struct {
int32_t num; int32_t num;
SMqOffset* offsets; SMqOffset* offsets;
} SMqCMResetOffsetReq; } SMqCMCommitOffsetReq;
typedef struct { typedef struct {
int32_t reserved; int32_t reserved;
} SMqCMResetOffsetRsp; } SMqCMCommitOffsetRsp;
typedef struct {
int64_t leftForVer;
SMqVgOffsets offsets;
} SMqMVResetOffsetReq;
typedef struct {
int32_t reserved;
} SMqMVResetOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq); int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
typedef struct { typedef struct {
uint32_t nCols; uint32_t nCols;
...@@ -1870,7 +1854,6 @@ typedef struct { ...@@ -1870,7 +1854,6 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
SSchemaWrapper* schemas; SSchemaWrapper* schemas;
int64_t committedOffset;
int64_t reqOffset; int64_t reqOffset;
int64_t rspOffset; int64_t rspOffset;
int32_t skipLogNum; int32_t skipLogNum;
...@@ -1881,22 +1864,18 @@ typedef struct { ...@@ -1881,22 +1864,18 @@ typedef struct {
// one req for one vg+topic // one req for one vg+topic
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
// 0: commit only, current offset
// 1: consume only, poll next offset
// 2: commit current and consume next offset
int32_t reqType;
int64_t reqId;
int64_t consumerId; int64_t consumerId;
int64_t blockingTime; int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t offset; int64_t currentOffset;
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq; } SMqConsumeReq;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int64_t offset;
SEpSet epSet; SEpSet epSet;
} SMqSubVgEp; } SMqSubVgEp;
...@@ -1917,12 +1896,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taos ...@@ -1917,12 +1896,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taos
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet); buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf; return buf;
} }
......
...@@ -143,10 +143,10 @@ enum { ...@@ -143,10 +143,10 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
// Requests handled by VNODE // Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
...@@ -113,14 +113,15 @@ typedef enum { ...@@ -113,14 +113,15 @@ typedef enum {
SDB_USER = 7, SDB_USER = 7,
SDB_AUTH = 8, SDB_AUTH = 8,
SDB_ACCT = 9, SDB_ACCT = 9,
SDB_SUBSCRIBE = 10, SDB_OFFSET = 10,
SDB_CONSUMER = 11, SDB_SUBSCRIBE = 11,
SDB_TOPIC = 12, SDB_CONSUMER = 12,
SDB_VGROUP = 13, SDB_TOPIC = 13,
SDB_STB = 14, SDB_VGROUP = 14,
SDB_DB = 15, SDB_STB = 15,
SDB_FUNC = 16, SDB_DB = 16,
SDB_MAX = 17 SDB_FUNC = 17,
SDB_MAX = 18
} ESdbType; } ESdbType;
typedef struct SSdb SSdb; typedef struct SSdb SSdb;
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
extern "C" { extern "C" {
#endif #endif
// clang-format off
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
...@@ -260,6 +262,7 @@ int32_t* taosGetErrno(); ...@@ -260,6 +262,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7) #define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8) #define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) #define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0) #define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0)
// dnode // dnode
......
...@@ -24,373 +24,370 @@ extern "C" { ...@@ -24,373 +24,370 @@ extern "C" {
#define TSDB__packed #define TSDB__packed
#define TSKEY int64_t #define TSKEY int64_t
#define TSKEY_MIN INT64_MIN #define TSKEY_MIN INT64_MIN
#define TSKEY_MAX (INT64_MAX - 1) #define TSKEY_MAX (INT64_MAX - 1)
#define TSKEY_INITIAL_VAL TSKEY_MIN #define TSKEY_INITIAL_VAL TSKEY_MIN
// Bytes for each type. // Bytes for each type.
extern const int32_t TYPE_BYTES[15]; extern const int32_t TYPE_BYTES[15];
// TODO: replace and remove code below // TODO: replace and remove code below
#define CHAR_BYTES sizeof(char) #define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t) #define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int32_t) #define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t) #define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float) #define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double) #define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) #define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#define TSDB_KEYSIZE sizeof(TSKEY) #define TSDB_KEYSIZE sizeof(TSKEY)
#define TSDB_NCHAR_SIZE sizeof(int32_t) #define TSDB_NCHAR_SIZE sizeof(int32_t)
// NULL definition // NULL definition
#define TSDB_DATA_BOOL_NULL 0x02 #define TSDB_DATA_BOOL_NULL 0x02
#define TSDB_DATA_TINYINT_NULL 0x80 #define TSDB_DATA_TINYINT_NULL 0x80
#define TSDB_DATA_SMALLINT_NULL 0x8000 #define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000L #define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L #define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL #define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN #define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN #define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF #define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF
#define TSDB_DATA_BINARY_NULL 0xFF #define TSDB_DATA_BINARY_NULL 0xFF
#define TSDB_DATA_UTINYINT_NULL 0xFF #define TSDB_DATA_UTINYINT_NULL 0xFF
#define TSDB_DATA_USMALLINT_NULL 0xFFFF #define TSDB_DATA_USMALLINT_NULL 0xFFFF
#define TSDB_DATA_UINT_NULL 0xFFFFFFFF #define TSDB_DATA_UINT_NULL 0xFFFFFFFF
#define TSDB_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL #define TSDB_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define TSDB_DATA_NULL_STR "NULL" #define TSDB_DATA_NULL_STR "NULL"
#define TSDB_DATA_NULL_STR_L "null" #define TSDB_DATA_NULL_STR_L "null"
#define TSDB_NETTEST_USER "nettestinternal" #define TSDB_NETTEST_USER "nettestinternal"
#define TSDB_DEFAULT_USER "root" #define TSDB_DEFAULT_USER "root"
#ifdef _TD_POWER_ #ifdef _TD_POWER_
#define TSDB_DEFAULT_PASS "powerdb" #define TSDB_DEFAULT_PASS "powerdb"
#elif (_TD_TQ_ == true) #elif (_TD_TQ_ == true)
#define TSDB_DEFAULT_PASS "tqueue" #define TSDB_DEFAULT_PASS "tqueue"
#elif (_TD_PRO_ == true) #elif (_TD_PRO_ == true)
#define TSDB_DEFAULT_PASS "prodb" #define TSDB_DEFAULT_PASS "prodb"
#else #else
#define TSDB_DEFAULT_PASS "taosdata" #define TSDB_DEFAULT_PASS "taosdata"
#endif #endif
#define SHELL_MAX_PASSWORD_LEN 20 #define SHELL_MAX_PASSWORD_LEN 20
#define TSDB_TRUE 1 #define TSDB_TRUE 1
#define TSDB_FALSE 0 #define TSDB_FALSE 0
#define TSDB_OK 0 #define TSDB_OK 0
#define TSDB_ERR -1 #define TSDB_ERR -1
#define TS_PATH_DELIMITER "." #define TS_PATH_DELIMITER "."
#define TS_ESCAPE_CHAR '`' #define TS_ESCAPE_CHAR '`'
#define TSDB_TIME_PRECISION_MILLI 0 #define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1 #define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2 #define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_MICRO_STR "us"
#define TSDB_TIME_PRECISION_NANO_STR "ns" #define TSDB_TIME_PRECISION_NANO_STR "ns"
#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))) #define TSDB_TICK_PER_SECOND(precision) \
((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \
: ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member) #define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
#define T_APPEND_MEMBER(dst, ptr, type, member) \ #define T_APPEND_MEMBER(dst, ptr, type, member) \
do {\ do { \
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\ memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member)); \
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\ dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member)); \
} while(0) } while (0)
#define T_READ_MEMBER(src, type, target) \ #define T_READ_MEMBER(src, type, target) \
do { \ do { \
(target) = *(type *)(src); \ (target) = *(type *)(src); \
(src) = (void *)((char *)src + sizeof(type));\ (src) = (void *)((char *)src + sizeof(type)); \
} while(0) } while (0)
// TODO: check if below is necessary // TODO: check if below is necessary
#define TSDB_RELATION_INVALID 0 #define TSDB_RELATION_INVALID 0
#define TSDB_RELATION_LESS 1 #define TSDB_RELATION_LESS 1
#define TSDB_RELATION_GREATER 2 #define TSDB_RELATION_GREATER 2
#define TSDB_RELATION_EQUAL 3 #define TSDB_RELATION_EQUAL 3
#define TSDB_RELATION_LESS_EQUAL 4 #define TSDB_RELATION_LESS_EQUAL 4
#define TSDB_RELATION_GREATER_EQUAL 5 #define TSDB_RELATION_GREATER_EQUAL 5
#define TSDB_RELATION_NOT_EQUAL 6 #define TSDB_RELATION_NOT_EQUAL 6
#define TSDB_RELATION_LIKE 7 #define TSDB_RELATION_LIKE 7
#define TSDB_RELATION_NOT_LIKE 8 #define TSDB_RELATION_NOT_LIKE 8
#define TSDB_RELATION_ISNULL 9 #define TSDB_RELATION_ISNULL 9
#define TSDB_RELATION_NOTNULL 10 #define TSDB_RELATION_NOTNULL 10
#define TSDB_RELATION_IN 11 #define TSDB_RELATION_IN 11
#define TSDB_RELATION_NOT_IN 12 #define TSDB_RELATION_NOT_IN 12
#define TSDB_RELATION_AND 13 #define TSDB_RELATION_AND 13
#define TSDB_RELATION_OR 14 #define TSDB_RELATION_OR 14
#define TSDB_RELATION_NOT 15 #define TSDB_RELATION_NOT 15
#define TSDB_RELATION_MATCH 16 #define TSDB_RELATION_MATCH 16
#define TSDB_RELATION_NMATCH 17 #define TSDB_RELATION_NMATCH 17
#define TSDB_BINARY_OP_ADD 4000 #define TSDB_BINARY_OP_ADD 4000
#define TSDB_BINARY_OP_SUBTRACT 4001 #define TSDB_BINARY_OP_SUBTRACT 4001
#define TSDB_BINARY_OP_MULTIPLY 4002 #define TSDB_BINARY_OP_MULTIPLY 4002
#define TSDB_BINARY_OP_DIVIDE 4003 #define TSDB_BINARY_OP_DIVIDE 4003
#define TSDB_BINARY_OP_REMAINDER 4004 #define TSDB_BINARY_OP_REMAINDER 4004
#define TSDB_BINARY_OP_CONCAT 4005 #define TSDB_BINARY_OP_CONCAT 4005
#define FUNCTION_CEIL 4500 #define FUNCTION_CEIL 4500
#define FUNCTION_FLOOR 4501 #define FUNCTION_FLOOR 4501
#define FUNCTION_ABS 4502 #define FUNCTION_ABS 4502
#define FUNCTION_ROUND 4503 #define FUNCTION_ROUND 4503
#define FUNCTION_LENGTH 4800 #define FUNCTION_LENGTH 4800
#define FUNCTION_CONCAT 4801 #define FUNCTION_CONCAT 4801
#define FUNCTION_LTRIM 4802 #define FUNCTION_LTRIM 4802
#define FUNCTION_RTRIM 4803 #define FUNCTION_RTRIM 4803
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN)) #define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER)) #define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
#define TSDB_NAME_DELIMITER_LEN 1 #define TSDB_NAME_DELIMITER_LEN 1
#define TSDB_UNI_LEN 24 #define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN #define TSDB_USER_LEN TSDB_UNI_LEN
// ACCOUNT is a 32 bit positive integer // ACCOUNT is a 32 bit positive integer
// this is the length of its string representation, including the terminator zero // this is the length of its string representation, including the terminator zero
#define TSDB_ACCT_ID_LEN 11 #define TSDB_ACCT_ID_LEN 11
#define TSDB_MAX_COLUMNS 4096 #define TSDB_MAX_COLUMNS 4096
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns #define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns
#define TSDB_NODE_NAME_LEN 64 #define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65 #define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 4096 #define TSDB_FUNC_COMMENT_LEN 4096
#define TSDB_FUNC_CODE_LEN (65535 - 512) #define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_FUNC_BUF_SIZE 512 #define TSDB_FUNC_BUF_SIZE 512
#define TSDB_FUNC_TYPE_SCALAR 1 #define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_MAX_RETRIEVE 1024 #define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192 #define TSDB_CONSUMER_GROUP_LEN 193
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65 #define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb #define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_STB_COMMENT_LEN 1024 #define TSDB_APP_NAME_LEN TSDB_UNI_LEN
/** #define TSDB_STB_COMMENT_LEN 1024
* In some scenarios uint16_t (0~65535) is used to store the row len. /**
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header. * In some scenarios uint16_t (0~65535) is used to store the row len.
* - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus * - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
* the final value is 65531-(4096-1)*4 = 49151. * - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus
*/ * the final value is 65531-(4096-1)*4 = 49151.
#define TSDB_MAX_BYTES_PER_ROW 49151 */
#define TSDB_MAX_TAGS_LEN 16384 #define TSDB_MAX_BYTES_PER_ROW 49151
#define TSDB_MAX_TAGS 128 #define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_TAG_CONDITIONS 1024 #define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_AUTH_LEN 16
#define TSDB_PASSWORD_LEN 32 #define TSDB_AUTH_LEN 16
#define TSDB_USET_PASSWORD_LEN 129 #define TSDB_PASSWORD_LEN 32
#define TSDB_VERSION_LEN 12 #define TSDB_USET_PASSWORD_LEN 129
#define TSDB_LABEL_LEN 8 #define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8
#define TSDB_CLUSTER_ID_LEN 40
#define TSDB_FQDN_LEN 128 #define TSDB_CLUSTER_ID_LEN 40
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) #define TSDB_FQDN_LEN 128
#define TSDB_IPv4ADDR_LEN 16 #define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
#define TSDB_FILENAME_LEN 128 #define TSDB_IPv4ADDR_LEN 16
#define TSDB_SHOW_SQL_LEN 512 #define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_SHOW_SQL_LEN 512
#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_ERROR_LEN 64 #define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128 #define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
#define TSDB_ERROR_MSG_LEN 1024
#define TSDB_DNODE_CONFIG_LEN 128 #define TSDB_ERROR_MSG_LEN 1024
#define TSDB_DNODE_VALUE_LEN 256 #define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8 #define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_USER_LEN 24 #define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_PASS_LEN 24 #define TSDB_MQTT_USER_LEN 24
#define TSDB_MQTT_TOPIC_LEN 64 #define TSDB_MQTT_PASS_LEN 24
#define TSDB_MQTT_CLIENT_ID_LEN 32 #define TSDB_MQTT_TOPIC_LEN 64
#define TSDB_MQTT_CLIENT_ID_LEN 32
#define TSDB_DB_TYPE_DEFAULT 0
#define TSDB_DB_TYPE_TOPIC 1 #define TSDB_DB_TYPE_DEFAULT 0
#define TSDB_DB_TYPE_TOPIC 1
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 16 #define TSDB_MIN_VNODES 16
#define TSDB_MAX_VNODES 512 #define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 1 #define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 4096 #define TSDB_MAX_VNODES_PER_DB 4096
#define TSDB_DEFAULT_VN_PER_DB 2 #define TSDB_DEFAULT_VN_PER_DB 2
#define TSDB_DNODE_ROLE_ANY 0 #define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1 #define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5 #define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_UD_COLUMN_INDEX (-1000) #define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000) #define TSDB_RES_COL_ID (-5000)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_CACHE_BLOCK_SIZE 1
#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
#define TSDB_MIN_TOTAL_BLOCKS 3 #define TSDB_MIN_CACHE_BLOCK_SIZE 1
#define TSDB_MAX_TOTAL_BLOCKS 10000 #define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
#define TSDB_DEFAULT_TOTAL_BLOCKS 6 #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
#define TSDB_MIN_DAYS_PER_FILE 1 #define TSDB_MIN_TOTAL_BLOCKS 3
#define TSDB_MAX_DAYS_PER_FILE 3650 #define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_DAYS_PER_FILE 10 #define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_KEEP 1 // data in db to be reserved. #define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_KEEP 365000 // data in db to be reserved. #define TSDB_MAX_DAYS_PER_FILE 3650
#define TSDB_DEFAULT_KEEP 3650 // ten years #define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_MIN_ROW_FBLOCK 10 #define TSDB_MIN_KEEP 1 // data in db to be reserved.
#define TSDB_MAX_MIN_ROW_FBLOCK 1000 #define TSDB_MAX_KEEP 365000 // data in db to be reserved.
#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100 #define TSDB_DEFAULT_KEEP 3650 // ten years
#define TSDB_MIN_MAX_ROW_FBLOCK 200 #define TSDB_MIN_MIN_ROW_FBLOCK 10
#define TSDB_MAX_MAX_ROW_FBLOCK 10000 #define TSDB_MAX_MIN_ROW_FBLOCK 1000
#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096 #define TSDB_DEFAULT_MIN_ROW_FBLOCK 100
#define TSDB_MIN_COMMIT_TIME 30 #define TSDB_MIN_MAX_ROW_FBLOCK 200
#define TSDB_MAX_COMMIT_TIME 40960 #define TSDB_MAX_MAX_ROW_FBLOCK 10000
#define TSDB_DEFAULT_COMMIT_TIME 3600 #define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096
#define TSDB_MIN_FSYNC_PERIOD 0 #define TSDB_MIN_COMMIT_TIME 30
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond #define TSDB_MAX_COMMIT_TIME 40960
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second #define TSDB_DEFAULT_COMMIT_TIME 3600
#define TSDB_MIN_WAL_LEVEL 0 #define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_WAL_LEVEL 2 #define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI #define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_PRECISION TSDB_TIME_PRECISION_NANO #define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_PRECISION TSDB_TIME_PRECISION_MILLI #define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_COMP_LEVEL 0 #define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI
#define TSDB_MAX_COMP_LEVEL 2 #define TSDB_MAX_PRECISION TSDB_TIME_PRECISION_NANO
#define TSDB_DEFAULT_COMP_LEVEL 2 #define TSDB_DEFAULT_PRECISION TSDB_TIME_PRECISION_MILLI
#define TSDB_MIN_DB_REPLICA_OPTION 1 #define TSDB_MIN_COMP_LEVEL 0
#define TSDB_MAX_DB_REPLICA_OPTION 3 #define TSDB_MAX_COMP_LEVEL 2
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1 #define TSDB_DEFAULT_COMP_LEVEL 2
#define TSDB_MIN_DB_QUORUM_OPTION 1 #define TSDB_MIN_DB_REPLICA_OPTION 1
#define TSDB_MAX_DB_QUORUM_OPTION 2 #define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1 #define TSDB_DEFAULT_DB_REPLICA_OPTION 1
#define TSDB_MIN_DB_UPDATE 0 #define TSDB_MIN_DB_QUORUM_OPTION 1
#define TSDB_MAX_DB_UPDATE 2 #define TSDB_MAX_DB_QUORUM_OPTION 2
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_DEFAULT_DB_QUORUM_OPTION 1
#define TSDB_MIN_DB_CACHE_LAST_ROW 0 #define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 3 #define TSDB_MAX_DB_UPDATE 2
#define TSDB_DEFAULT_CACHE_LAST_ROW 0 #define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MAX_JOIN_TABLE_NUM 10 #define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_UNION_CLAUSE 5 #define TSDB_MAX_DB_CACHE_LAST_ROW 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MAX_FIELD_LEN 16384 #define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384 #define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
#define TSDB_MAX_RPC_THREADS 5 #define TSDB_MAX_FIELD_LEN 16384
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_MAX_RPC_THREADS 5
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default #define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
/* /*
* 1. ordinary sub query for select * from super_table * 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag * 2. all sqlobj generated by createSubqueryObj with this flag
*/ */
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u #define TSDB_QUERY_TYPE_SUBQUERY 0x02u
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table #define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side #define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table #define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query #define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query #define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage #define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file #define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type #define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query #define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type)) #define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE) #define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_ORDER_ASC 1 #define TSDB_ORDER_ASC 1
#define TSDB_ORDER_DESC 2 #define TSDB_ORDER_DESC 2
#define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1 #define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1
#define TSDB_DEFAULT_MNODES_HASH_SIZE 5 #define TSDB_DEFAULT_MNODES_HASH_SIZE 5
#define TSDB_DEFAULT_DNODES_HASH_SIZE 10 #define TSDB_DEFAULT_DNODES_HASH_SIZE 10
#define TSDB_DEFAULT_ACCOUNTS_HASH_SIZE 10 #define TSDB_DEFAULT_ACCOUNTS_HASH_SIZE 10
#define TSDB_DEFAULT_USERS_HASH_SIZE 20 #define TSDB_DEFAULT_USERS_HASH_SIZE 20
#define TSDB_DEFAULT_DBS_HASH_SIZE 100 #define TSDB_DEFAULT_DBS_HASH_SIZE 100
#define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100 #define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100 #define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 #define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_MAX_WAL_SIZE (1024*1024*3) #define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
#define TFS_MAX_TIERS 3 #define TFS_MAX_TIERS 3
#define TFS_MAX_DISKS_PER_TIER 16 #define TFS_MAX_DISKS_PER_TIER 16
#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER) #define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER)
#define TFS_MIN_LEVEL 0 #define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1) #define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0 #define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0 #define TFS_PRIMARY_ID 0
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 #define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED }; enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
......
...@@ -45,22 +45,24 @@ struct tmq_topic_vgroup_list_t { ...@@ -45,22 +45,24 @@ struct tmq_topic_vgroup_list_t {
}; };
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[256];
char groupId[256]; char groupId[256];
bool auto_commit; int8_t auto_commit;
int8_t resetOffset;
tmq_commit_cb* commit_cb;
/*char* ip;*/ /*char* ip;*/
/*uint16_t port;*/ /*uint16_t port;*/
tmq_commit_cb* commit_cb;
}; };
struct tmq_t { struct tmq_t {
// conf // conf
char groupId[256]; char groupId[256];
char clientId[256]; char clientId[256];
bool autoCommit; int8_t autoCommit;
SRWLatch lock; SRWLatch lock;
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
int32_t resetOffsetCfg;
int64_t status; int64_t status;
tsem_t rspSem; tsem_t rspSem;
STscObj* pTscObj; STscObj* pTscObj;
...@@ -79,7 +81,6 @@ typedef struct { ...@@ -79,7 +81,6 @@ typedef struct {
// statistics // statistics
int64_t pollCnt; int64_t pollCnt;
// offset // offset
int64_t committedOffset;
int64_t currentOffset; int64_t currentOffset;
// connection info // connection info
int32_t vgId; int32_t vgId;
...@@ -115,21 +116,17 @@ typedef struct { ...@@ -115,21 +116,17 @@ typedef struct {
} SMqConsumeCbParam; } SMqConsumeCbParam;
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
SMqClientVg* pVg; /*SMqClientVg* pVg;*/
int32_t async; int32_t async;
tsem_t rspSem;
} SMqCommitCbParam;
typedef struct {
tmq_t* tmq;
tsem_t rspSem; tsem_t rspSem;
tmq_resp_err_t rspErr; tmq_resp_err_t rspErr;
} SMqResetOffsetParam; } SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false; conf->auto_commit = false;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf; return conf;
} }
...@@ -157,6 +154,20 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -157,6 +154,20 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;
} }
} }
if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
return TMQ_CONF_OK;
} else if (strcmp(value, "earliest") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return TMQ_CONF_OK;
} else if (strcmp(value, "latest") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
return TMQ_CONF_UNKNOWN; return TMQ_CONF_UNKNOWN;
} }
...@@ -190,14 +201,12 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -190,14 +201,12 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (pParam->tmq->commit_cb) { if (pParam->tmq->commit_cb) {
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
} }
if (!pParam->async) tsem_post(&pParam->rspSem); if (!pParam->async)
return 0; tsem_post(&pParam->rspSem);
} else {
tsem_destroy(&pParam->rspSem);
int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) { free(param);
SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param; }
pParam->rspErr = code;
tsem_post(&pParam->rspSem);
return 0; return 0;
} }
...@@ -216,6 +225,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -216,6 +225,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->auto_commit; pTmq->autoCommit = conf->auto_commit;
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
tsem_init(&pTmq->rspSem, 0, 0); tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
...@@ -223,18 +233,40 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -223,18 +233,40 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq; return pTmq;
} }
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
SRequestObj* pRequest = NULL; // TODO: add read write lock
SRequestObj* pRequest = NULL;
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
// build msg // build msg
// send to mnode // send to mnode
SMqCMResetOffsetReq req; SMqCMCommitOffsetReq req;
req.num = offsets->cnt; SArray* pArray = NULL;
req.offsets = (SMqOffset*)offsets->elems;
if (offsets == NULL) {
pArray = taosArrayInit(0, sizeof(SMqOffset));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
strcpy(offset.topicName, pTopic->topicName);
strcpy(offset.cgroup, tmq->groupId);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pArray, &offset);
}
}
req.num = pArray->size;
req.offsets = pArray->pData;
} else {
req.num = offsets->cnt;
req.offsets = (SMqOffset*)offsets->elems;
}
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSMqCMResetOffsetReq(&encoder, &req); tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
void* buf = malloc(tlen); void* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
...@@ -244,32 +276,41 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse ...@@ -244,32 +276,41 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse
tCoderClear(&encoder); tCoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncodeSMqCMResetOffsetReq(&encoder, &req); tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder); tCoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET); pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) { if (pRequest == NULL) {
tscError("failed to malloc request"); tscError("failed to malloc request");
} }
SMqResetOffsetParam param = {0}; SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam));
tsem_init(&param.rspSem, 0, 0); if (pParam == NULL) {
param.tmq = tmq; return -1;
}
pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param; sendInfo->param = pParam;
sendInfo->fp = tmqResetOffsetCb; sendInfo->fp = tmqCommitCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&param.rspSem); if (!async) {
tsem_destroy(&param.rspSem); tsem_wait(&pParam->rspSem);
resp = pParam->rspErr;
}
return param.rspErr; if (pArray) {
taosArrayDestroy(pArray);
}
return resp;
} }
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
...@@ -641,8 +682,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -641,8 +682,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
// clang-format off // clang-format off
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.committedOffset = -1, .currentOffset = pVgEp->offset,
.currentOffset = -1,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet .epSet = pVgEp->epSet
}; };
...@@ -708,23 +748,51 @@ END: ...@@ -708,23 +748,51 @@ END:
return 0; return 0;
} }
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
SMqClientVg* pVg) { const SMqOffset* pOffset = &offset->offset;
if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
return TMQ_RESP_ERR__FAIL;
}
int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
for (int32_t j = 0; j < vgSz; j++) {
SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
if (pVg->vgId == pOffset->vgId) {
pVg->currentOffset = pOffset->offset;
return TMQ_RESP_ERR__SUCCESS;
}
}
}
}
return TMQ_RESP_ERR__FAIL;
}
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
} else {
if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
tscError("unable to poll since no committed offset but reset offset is set to none");
return NULL;
}
reqOffset = tmq->resetOffsetCfg;
}
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
if (pReq == NULL) { if (pReq == NULL) {
return NULL; return NULL;
} }
pReq->reqType = type;
strcpy(pReq->topic, pTopic->topicName); strcpy(pReq->topic, pTopic->topicName);
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
strcpy(pReq->cgroup, tmq->groupId); strcpy(pReq->cgroup, tmq->groupId);
if (type == TMQ_REQ_TYPE_COMMIT_ONLY) { pReq->blockingTime = blocking_time;
pReq->offset = pVg->currentOffset; pReq->consumerId = tmq->consumerId;
} else { pReq->currentOffset = reqOffset;
pReq->offset = pVg->currentOffset + 1;
}
pReq->head.vgId = htonl(pVg->vgId); pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
...@@ -743,13 +811,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -743,13 +811,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
if (taosArrayGetSize(tmq->clientTopics) == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
printf("over1\n"); /*printf("over1\n");*/
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
return NULL; return NULL;
} }
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
if (taosArrayGetSize(pTopic->vgs) == 0) { if (taosArrayGetSize(pTopic->vgs) == 0) {
printf("over2\n"); /*printf("over2\n");*/
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
return NULL; return NULL;
} }
...@@ -760,8 +828,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -760,8 +828,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY; SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
if (pReq == NULL) { if (pReq == NULL) {
ASSERT(false); ASSERT(false);
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
...@@ -821,6 +888,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -821,6 +888,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*return pRequest;*/ /*return pRequest;*/
} }
#if 0
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
if (tmq_topic_vgroup_list != NULL) { if (tmq_topic_vgroup_list != NULL) {
// TODO // TODO
...@@ -831,7 +899,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -831,7 +899,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg); SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
...@@ -858,6 +926,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -858,6 +926,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
return 0; return 0;
} }
#endif
void tmq_message_destroy(tmq_message_t* tmq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
......
...@@ -285,7 +285,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR ...@@ -285,7 +285,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver); tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->ttl);
tlen += taosEncodeFixedU32(buf, pReq->keep); tlen += taosEncodeFixedU32(buf, pReq->keep);
...@@ -330,7 +330,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -330,7 +330,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
} }
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeFixedU64(buf, &(pReq->ver)); buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep)); buf = taosDecodeFixedU32(buf, &(pReq->keep));
...@@ -380,7 +380,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -380,7 +380,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver); tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray)); tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) { for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i); SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i);
...@@ -393,7 +393,7 @@ int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { ...@@ -393,7 +393,7 @@ int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0; uint32_t nsize = 0;
buf = taosDecodeFixedU64(buf, &pReq->ver); buf = taosDecodeFixedI64(buf, &pReq->ver);
buf = taosDecodeFixedU32(buf, &nsize); buf = taosDecodeFixedU32(buf, &nsize);
pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq)); pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq));
for (size_t i = 0; i < nsize; i++) { for (size_t i = 0; i < nsize; i++) {
...@@ -407,14 +407,14 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { ...@@ -407,14 +407,14 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) { int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver); tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU8(buf, pReq->type); tlen += taosEncodeFixedU8(buf, pReq->type);
return tlen; return tlen;
} }
void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
buf = taosDecodeFixedU64(buf, &pReq->ver); buf = taosDecodeFixedI64(buf, &pReq->ver);
buf = taosDecodeString(buf, &pReq->name); buf = taosDecodeString(buf, &pReq->name);
buf = taosDecodeFixedU8(buf, &pReq->type); buf = taosDecodeFixedU8(buf, &pReq->type);
return buf; return buf;
...@@ -1393,7 +1393,7 @@ int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { ...@@ -1393,7 +1393,7 @@ int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) {
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->db) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->db) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->uid) < 0) return -1; if (tEncodeI64(&encoder, pRsp->uid) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1407,7 +1407,7 @@ int32_t tDeserializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { ...@@ -1407,7 +1407,7 @@ int32_t tDeserializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) {
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->db) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->uid) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->uid) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);
...@@ -1468,7 +1468,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { ...@@ -1468,7 +1468,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) {
static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->uid) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1;
...@@ -1518,7 +1518,7 @@ int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp ...@@ -1518,7 +1518,7 @@ int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp
int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->uid) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1;
...@@ -1661,7 +1661,7 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) { ...@@ -1661,7 +1661,7 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) {
if (tEncodeCStr(pEncoder, pRsp->tbName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->tbName) < 0) return -1;
if (tEncodeCStr(pEncoder, pRsp->stbName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->stbName) < 0) return -1;
if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1; if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->dbId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->numOfTags) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->numOfTags) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->numOfColumns) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->numOfColumns) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1;
...@@ -1669,8 +1669,8 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) { ...@@ -1669,8 +1669,8 @@ static int32_t tEncodeSTableMetaRsp(SCoder *pEncoder, STableMetaRsp *pRsp) {
if (tEncodeI8(pEncoder, pRsp->update) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->update) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->sversion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->sversion) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->tversion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->tversion) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->suid) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->suid) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->tuid) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->tuid) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfColumns + pRsp->numOfTags; ++i) { for (int32_t i = 0; i < pRsp->numOfColumns + pRsp->numOfTags; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i]; SSchema *pSchema = &pRsp->pSchemas[i];
...@@ -1684,7 +1684,7 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) { ...@@ -1684,7 +1684,7 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) {
if (tDecodeCStrTo(pDecoder, pRsp->tbName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->tbName) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pRsp->stbName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->stbName) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->dbId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->numOfTags) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->numOfTags) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->numOfColumns) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->numOfColumns) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1;
...@@ -1692,8 +1692,8 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) { ...@@ -1692,8 +1692,8 @@ static int32_t tDecodeSTableMetaRsp(SCoder *pDecoder, STableMetaRsp *pRsp) {
if (tDecodeI8(pDecoder, &pRsp->update) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->update) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->sversion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->sversion) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->tversion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->tversion) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->suid) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->suid) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->tuid) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->tuid) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns; int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
...@@ -2093,7 +2093,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR ...@@ -2093,7 +2093,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->cacheBlockSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheBlockSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1; if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1;
...@@ -2133,7 +2133,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * ...@@ -2133,7 +2133,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->cacheBlockSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheBlockSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1; if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1;
...@@ -2171,7 +2171,7 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) ...@@ -2171,7 +2171,7 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq)
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2187,7 +2187,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq ...@@ -2187,7 +2187,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
...@@ -2356,35 +2356,7 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) { ...@@ -2356,35 +2356,7 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
return 0; return 0;
} }
int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) { int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq *pReq) {
if (tStartEncode(encoder) < 0) return -1;
if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1;
int32_t sz = taosArrayGetSize(pOffsets->offsets);
if (tEncodeI32(encoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SMqOffset *offset = taosArrayGet(pOffsets->offsets, i);
if (tEncodeSMqOffset(encoder, offset) < 0) return -1;
}
tEndEncode(encoder);
return encoder->pos;
}
int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) {
int32_t sz;
if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1;
if (tDecodeI32(decoder, &sz) < 0) return -1;
pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset));
for (int32_t i = 0; i < sz; i++) {
SMqOffset offset;
if (tDecodeSMqOffset(decoder, &offset) < 0) return -1;
taosArrayPush(pOffsets->offsets, &offset);
}
tEndDecode(decoder);
return 0;
}
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
if (tStartEncode(encoder) < 0) return -1; if (tStartEncode(encoder) < 0) return -1;
if (tEncodeI32(encoder, pReq->num) < 0) return -1; if (tEncodeI32(encoder, pReq->num) < 0) return -1;
for (int32_t i = 0; i < pReq->num; i++) { for (int32_t i = 0; i < pReq->num; i++) {
...@@ -2394,7 +2366,7 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p ...@@ -2394,7 +2366,7 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p
return encoder->pos; return encoder->pos;
} }
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
if (tStartDecode(decoder) < 0) return -1; if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pReq->num) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
...@@ -2405,23 +2377,3 @@ int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { ...@@ -2405,23 +2377,3 @@ int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
tEndDecode(decoder); tEndDecode(decoder);
return 0; return 0;
} }
#if 0
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
}
return encoder->pos;
}
int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
}
return 0;
}
#endif
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
#include "dndMnode.h" #include "dndMnode.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#define INTERNAL_USER "_dnd" #define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
static void dndInitMsgFp(STransMgmt *pMgmt) { static void dndInitMsgFp(STransMgmt *pMgmt) {
...@@ -113,6 +113,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -113,6 +113,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg;
...@@ -155,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -155,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode * pDnode = parent; SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
...@@ -219,7 +220,7 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -219,7 +220,7 @@ static void dndCleanupClient(SDnode *pDnode) {
} }
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode * pDnode = param; SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType; tmsg_t msgType = pReq->msgType;
...@@ -313,7 +314,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -313,7 +314,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......
...@@ -123,6 +123,7 @@ typedef enum { ...@@ -123,6 +123,7 @@ typedef enum {
TRN_TYPE_DROP_TOPIC = 1015, TRN_TYPE_DROP_TOPIC = 1015,
TRN_TYPE_SUBSCRIBE = 1016, TRN_TYPE_SUBSCRIBE = 1016,
TRN_TYPE_REBALANCE = 1017, TRN_TYPE_REBALANCE = 1017,
TRN_TYPE_COMMIT_OFFSET = 1018,
TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001, TRN_TYPE_CREATE_DNODE = 2001,
...@@ -176,7 +177,7 @@ typedef struct { ...@@ -176,7 +177,7 @@ typedef struct {
SArray* undoActions; SArray* undoActions;
int64_t createdTime; int64_t createdTime;
int64_t lastExecTime; int64_t lastExecTime;
uint64_t dbUid; int64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN]; char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN]; char lastError[TSDB_TRANS_ERROR_LEN];
} STrans; } STrans;
...@@ -304,16 +305,16 @@ typedef struct { ...@@ -304,16 +305,16 @@ typedef struct {
} SDbCfg; } SDbCfg;
typedef struct { typedef struct {
char name[TSDB_DB_FNAME_LEN]; char name[TSDB_DB_FNAME_LEN];
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
char createUser[TSDB_USER_LEN]; char createUser[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; int64_t uid;
int32_t cfgVersion; int32_t cfgVersion;
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; // default is 1 int8_t hashMethod; // default is 1
SDbCfg cfg; SDbCfg cfg;
} SDbObj; } SDbObj;
typedef struct { typedef struct {
...@@ -346,8 +347,8 @@ typedef struct { ...@@ -346,8 +347,8 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; int64_t uid;
uint64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
int32_t nextColId; int32_t nextColId;
int32_t numOfColumns; int32_t numOfColumns;
...@@ -465,6 +466,24 @@ static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) { ...@@ -465,6 +466,24 @@ static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) {
} }
} }
typedef struct {
char key[TSDB_PARTITION_KEY_LEN];
int64_t offset;
} SMqOffsetObj;
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
typedef struct { typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t status; int32_t status;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_OFFSET_H_
#define _TD_MND_OFFSET_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitOffset(SMnode *pMnode);
void mndCleanupOffset(SMnode *pMnode);
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key);
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset);
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset);
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw);
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_OFFSET_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndOffset.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#define MND_OFFSET_VER_NUMBER 1
#define MND_OFFSET_RESERVE_SIZE 64
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset);
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset);
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset);
static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pReq);
int32_t mndInitOffset(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_OFFSET,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndOffsetActionEncode,
.decodeFp = (SdbDecodeFp)mndOffsetActionDecode,
.insertFp = (SdbInsertFp)mndOffsetActionInsert,
.updateFp = (SdbUpdateFp)mndOffsetActionUpdate,
.deleteFp = (SdbDeleteFp)mndOffsetActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_MQ_COMMIT_OFFSET, mndProcessCommitOffsetReq);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupOffset(SMnode *pMnode) {}
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
int32_t tlen = tEncodeSMqOffsetObj(NULL, pOffset);
int32_t size = sizeof(int32_t) + tlen + MND_OFFSET_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_OFFSET, MND_OFFSET_VER_NUMBER, size);
if (pRaw == NULL) goto OFFSET_ENCODE_OVER;
buf = malloc(tlen);
if (buf == NULL) goto OFFSET_ENCODE_OVER;
void *abuf = buf;
tEncodeSMqOffsetObj(&abuf, pOffset);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, OFFSET_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, OFFSET_ENCODE_OVER);
terrno = TSDB_CODE_SUCCESS;
OFFSET_ENCODE_OVER:
tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("offset:%s, failed to encode to raw:%p since %s", pOffset->key, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("offset:%s, encode to raw:%p, row:%p", pOffset->key, pRaw, pOffset);
return pRaw;
}
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto OFFSET_DECODE_OVER;
if (sver != MND_OFFSET_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto OFFSET_DECODE_OVER;
}
int32_t size = sizeof(SMqOffsetObj);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto OFFSET_DECODE_OVER;
SMqOffsetObj *pOffset = sdbGetRowObj(pRow);
if (pOffset == NULL) goto OFFSET_DECODE_OVER;
int32_t dataPos = 0;
int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, OFFSET_DECODE_OVER);
buf = malloc(tlen + 1);
if (buf == NULL) goto OFFSET_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_DECODE_OVER);
if (tDecodeSMqOffsetObj(buf, pOffset) == NULL) {
goto OFFSET_DECODE_OVER;
}
terrno = TSDB_CODE_SUCCESS;
OFFSET_DECODE_OVER:
tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("offset:%s, failed to decode from raw:%p since %s", pOffset->key, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("offset:%s, decode from raw:%p, row:%p", pOffset->key, pRaw, pOffset);
return pRow;
}
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
int32_t code = 0;
int32_t sz = taosArrayGetSize(vgs);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i);
SMqOffsetObj offsetObj;
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, pConsumerEp->vgId) < 0) {
return -1;
}
offsetObj.offset = -1;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
if (pOffsetRaw == NULL) {
return -1;
}
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) {
return -1;
}
}
return 0;
}
static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pMsg) {
char key[TSDB_PARTITION_KEY_LEN];
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SMqCMCommitOffsetReq commitOffsetReq;
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, &pMsg->rpcMsg);
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
return -1;
}
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
ASSERT(pOffsetObj);
pOffsetObj->offset = pOffset->offset;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pOffsetRaw);
mndReleaseOffset(pMnode, pOffsetObj);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) {
mTrace("offset:%s, perform insert action", pOffset->key);
return 0;
}
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
mTrace("offset:%s, perform delete action", pOffset->key);
return 0;
}
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
mTrace("offset:%s, perform update action", pOldOffset->key);
return 0;
}
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key) {
SSdb *pSdb = pMnode->pSdb;
SMqOffsetObj *pOffset = sdbAcquire(pSdb, SDB_OFFSET, key);
if (pOffset == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_OFFSET_NOT_EXIST;
}
return pOffset;
}
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pOffset);
}
static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h" #include "mndTopic.h"
...@@ -80,13 +81,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) { ...@@ -80,13 +81,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) { static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
SMqSubscribeObj *pSub = tNewSubscribeObj(); SMqSubscribeObj *pSub = tNewSubscribeObj();
if (pSub == NULL) { if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name); char *key = mndMakeSubscribeKey(cgroup, pTopic->name);
if (key == NULL) { if (key == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSMqSubscribeObj(pSub); tDeleteSMqSubscribeObj(pSub);
...@@ -289,9 +290,15 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -289,9 +290,15 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy(topicEp.topic, topicName); strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
for (int32_t k = 0; k < vgsz; k++) { for (int32_t k = 0; k < vgsz; k++) {
char offsetKey[TSDB_PARTITION_KEY_LEN];
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1};
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId);
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
if (pOffsetObj != NULL) {
vgEp.offset = pOffsetObj->offset;
mndReleaseOffset(pMnode, pOffsetObj);
}
taosArrayPush(topicEp.vgs, &vgEp); taosArrayPush(topicEp.vgs, &vgEp);
} }
taosArrayPush(rsp.topics, &topicEp); taosArrayPush(rsp.topics, &topicEp);
...@@ -870,7 +877,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { ...@@ -870,7 +877,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
SUB_ENCODE_OVER: SUB_ENCODE_OVER:
tfree(buf); tfree(buf);
if (terrno != 0) { if (terrno != TSDB_CODE_SUCCESS) {
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return NULL; return NULL;
...@@ -1085,6 +1092,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -1085,6 +1092,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName); mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
pSub = mndCreateSubscription(pMnode, pTopic, cgroup); pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
createSub = true; createSub = true;
mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
} }
SMqSubConsumer mqSubConsumer; SMqSubConsumer mqSubConsumer;
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "mndDnode.h" #include "mndDnode.h"
#include "mndFunc.h" #include "mndFunc.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h"
#include "mndProfile.h" #include "mndProfile.h"
#include "mndQnode.h" #include "mndQnode.h"
#include "mndShow.h" #include "mndShow.h"
...@@ -77,7 +78,7 @@ static void mndTransReExecute(void *param, void *tmrId) { ...@@ -77,7 +78,7 @@ static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
int32_t contLen = 0; int32_t contLen = 0;
void * pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
} }
...@@ -89,7 +90,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -89,7 +90,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
int32_t contLen = 0; int32_t contLen = 0;
void * pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
} }
...@@ -197,6 +198,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { ...@@ -197,6 +198,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
...@@ -440,7 +442,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { ...@@ -440,7 +442,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
int32_t code = 0; int32_t code = 0;
tmsg_t msgType = pMsg->rpcMsg.msgType; tmsg_t msgType = pMsg->rpcMsg.msgType;
void * ahandle = pMsg->rpcMsg.ahandle; void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType & 1U); bool isReq = (msgType & 1U);
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
......
...@@ -207,9 +207,17 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -207,9 +207,17 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t fetchOffset = pReq->offset; int64_t fetchOffset;
/*int64_t blockingTime = pReq->blockingTime;*/ /*int64_t blockingTime = pReq->blockingTime;*/
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
}
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
...@@ -226,31 +234,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -226,31 +234,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
ASSERT(pConsumer->consumerId == consumerId); ASSERT(pConsumer->consumerId == consumerId);
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { rsp.reqOffset = pReq->currentOffset;
pTopic->committedOffset = pReq->offset;
/*printf("offset %ld committed\n", pTopic->committedOffset);*/
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
if (pTopic->committedOffset < pReq->offset - 1) {
pTopic->committedOffset = pReq->offset - 1;
/*printf("offset %ld committed\n", pTopic->committedOffset);*/
}
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0; rsp.skipLogNum = 0;
if (fetchOffset <= pTopic->committedOffset) {
fetchOffset = pTopic->committedOffset + 1;
}
SWalHead* pHead; SWalHead* pHead;
while (1) { while (1) {
int8_t pos = fetchOffset % TQ_BUFFER_SIZE; int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
......
...@@ -22,14 +22,15 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -22,14 +22,15 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version // set request version
void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int64_t ver = pVnode->state.processed++; int64_t ver = pVnode->state.processed++;
taosEncodeFixedI64(&pBuf, ver); taosEncodeFixedI64(&pBuf, ver);
if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) {
/*ASSERT(false);*/
// TODO: handle error // TODO: handle error
/*ASSERT(false);*/
vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr());
} }
} }
......
...@@ -35,7 +35,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) ...@@ -35,7 +35,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("submit msg error while set stream msg, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册