未验证 提交 08942d5a 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11893 from taosdata/feature/tq

feat(tmq): support show
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
#include "osSleep.h"
static int running = 1; static int running = 1;
static void msg_process(TAOS_RES* msg) { static void msg_process(TAOS_RES* msg) {
......
...@@ -66,13 +66,13 @@ typedef struct SDataBlockInfo { ...@@ -66,13 +66,13 @@ typedef struct SDataBlockInfo {
int32_t rows; int32_t rows;
int32_t rowSize; int32_t rowSize;
union { union {
int64_t uid; // from which table of uid, comes from this data block int64_t uid; // from which table of uid, comes from this data block
int64_t blockId; int64_t blockId;
}; };
uint64_t groupId; // no need to serialize uint64_t groupId; // no need to serialize
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
int16_t capacity; int16_t capacity;
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
...@@ -131,59 +131,6 @@ static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { ...@@ -131,59 +131,6 @@ static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); } static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
int32_t tlen = 0;
int32_t sz = 0;
// tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return tlen;
tlen += taosEncodeSSchemaWrapper(buf, pRsp->schema);
if (pRsp->pBlockData) {
sz = taosArrayGetSize(pRsp->pBlockData);
}
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
tlen += tEncodeDataBlock(buf, pBlock);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
int32_t sz;
// buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return buf;
pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pRsp->schema == NULL) return NULL;
buf = taosDecodeSSchemaWrapper(buf, pRsp->schema);
buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock block = {0};
tDecodeDataBlock(buf, &block);
taosArrayPush(pRsp->pBlockData, &block);
}
return buf;
}
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
if (pRsp->schema) {
if (pRsp->schema->nCols) {
taosMemoryFreeClear(pRsp->schema->pSchema);
}
taosMemoryFree(pRsp->schema);
}
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))blockDestroyInner);
pRsp->pBlockData = NULL;
}
//====================================================================================================================== //======================================================================================================================
// the following structure shared by parser and executor // the following structure shared by parser and executor
typedef struct SColumn { typedef struct SColumn {
...@@ -204,22 +151,22 @@ typedef struct SColumn { ...@@ -204,22 +151,22 @@ typedef struct SColumn {
} SColumn; } SColumn;
typedef struct STableBlockDistInfo { typedef struct STableBlockDistInfo {
uint16_t rowSize; uint16_t rowSize;
uint16_t numOfFiles; uint16_t numOfFiles;
uint32_t numOfTables; uint32_t numOfTables;
uint64_t totalSize; uint64_t totalSize;
uint64_t totalRows; uint64_t totalRows;
int32_t maxRows; int32_t maxRows;
int32_t minRows; int32_t minRows;
int32_t firstSeekTimeUs; int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable; uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks; uint32_t numOfSmallBlocks;
SArray *dataBlockInfos; SArray* dataBlockInfos;
} STableBlockDistInfo; } STableBlockDistInfo;
enum { enum {
FUNC_PARAM_TYPE_VALUE = 0x1, FUNC_PARAM_TYPE_VALUE = 0x1,
FUNC_PARAM_TYPE_COLUMN= 0x2, FUNC_PARAM_TYPE_COLUMN = 0x2,
}; };
typedef struct SFunctParam { typedef struct SFunctParam {
...@@ -249,7 +196,7 @@ typedef struct SExprInfo { ...@@ -249,7 +196,7 @@ typedef struct SExprInfo {
struct tExprNode* pExpr; struct tExprNode* pExpr;
} SExprInfo; } SExprInfo;
#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1 #define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
......
...@@ -1650,103 +1650,11 @@ typedef struct { ...@@ -1650,103 +1650,11 @@ typedef struct {
char data[]; char data[];
} SVShowTablesFetchRsp; } SVShowTablesFetchRsp;
typedef struct SMqCMGetSubEpReq { typedef struct {
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
} SMqCMGetSubEpReq; } SMqAskEpReq;
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pMsg->contLen);
tlen += taosEncodeFixedI32(buf, pMsg->vgId);
return tlen;
}
typedef struct SMqHbRsp {
int8_t status; // idle or not
int8_t vnodeChanged;
int8_t epChanged; // should use new epset
int8_t reserved;
SEpSet epSet;
} SMqHbRsp;
static FORCE_INLINE int32_t taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pRsp->status);
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
buf = taosDecodeFixedI8(buf, &pRsp->status);
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
return buf;
}
typedef struct SMqHbOneTopicBatchRsp {
char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;
static FORCE_INLINE int32_t taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pBatchRsp->topicName);
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
tlen += taosEncodeSMqHbRsp(buf, pRsp);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
int32_t sz;
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbRsp rsp;
buf = taosDecodeSMqHbRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
}
return buf;
}
typedef struct SMqHbBatchRsp {
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
} SMqHbBatchRsp;
static FORCE_INLINE int32_t taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
int32_t sz;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
}
return buf;
}
typedef struct { typedef struct {
int32_t key; int32_t key;
...@@ -2470,22 +2378,6 @@ typedef struct { ...@@ -2470,22 +2378,6 @@ typedef struct {
int64_t consumerId; int64_t consumerId;
} SMqRspHead; } SMqRspHead;
#if 0
typedef struct {
SMsgHead head;
int64_t consumerId;
int64_t blockingTime;
int32_t epoch;
int8_t withSchema;
char cgroup[TSDB_CGROUP_LEN];
int64_t currentOffset;
uint64_t reqId;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqPollReq;
#endif
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
...@@ -2509,18 +2401,6 @@ typedef struct { ...@@ -2509,18 +2401,6 @@ typedef struct {
SSchemaWrapper schema; SSchemaWrapper schema;
} SMqSubTopicEp; } SMqSubTopicEp;
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
// TODO: replace with topic name
int32_t numOfTopics;
// TODO: remove from msg
SSchemaWrapper* schema;
SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp;
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
int64_t reqOffset; int64_t reqOffset;
...@@ -2644,7 +2524,7 @@ typedef struct { ...@@ -2644,7 +2524,7 @@ typedef struct {
SMqRspHead head; SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp> SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp; } SMqAskEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema); // taosMemoryFree(pSubTopicEp->schema.pSchema);
...@@ -2666,10 +2546,6 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { ...@@ -2666,10 +2546,6 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic); tlen += taosEncodeString(buf, pTopicEp->topic);
...@@ -2702,7 +2578,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE ...@@ -2702,7 +2578,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
return buf; return buf;
} }
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
// tlen += taosEncodeString(buf, pRsp->cgroup); // tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics); int32_t sz = taosArrayGetSize(pRsp->topics);
...@@ -2714,7 +2590,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu ...@@ -2714,7 +2590,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
// buf = taosDecodeStringTo(buf, pRsp->cgroup); // buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
...@@ -2730,6 +2606,10 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -2730,6 +2606,10 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -145,7 +145,7 @@ enum { ...@@ -145,7 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-mq-ask-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mnode-mq-ask-ep", SMqAskEpReq, SMqAskEpReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
......
...@@ -125,12 +125,12 @@ extern const int32_t TYPE_BYTES[15]; ...@@ -125,12 +125,12 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_QUERIES "queries" #define TSDB_INS_TABLE_QUERIES "queries"
#define TSDB_INS_TABLE_VNODES "vnodes" #define TSDB_INS_TABLE_VNODES "vnodes"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections" #define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries" #define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics" #define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers" #define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" #define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_INDEX_TYPE_SMA "SMA" #define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT" #define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
...@@ -286,8 +286,9 @@ typedef enum ELogicConditionType { ...@@ -286,8 +286,9 @@ typedef enum ELogicConditionType {
#define TSDB_IPv4ADDR_LEN 16 #define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128 #define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN 512 #define TSDB_SHOW_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SHOW_LIST_LEN 1000
#define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_TYPE_LEN 16
......
...@@ -25,7 +25,14 @@ ...@@ -25,7 +25,14 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
int32_t tmqAskEp(tmq_t* tmq, bool sync); int32_t tmqAskEp(tmq_t* tmq, bool async);
typedef struct {
int8_t inited;
tmr_h timer;
} SMqMgmt;
static SMqMgmt tmqMgmt = {0};
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
...@@ -33,9 +40,9 @@ typedef struct { ...@@ -33,9 +40,9 @@ typedef struct {
} SMqRspWrapper; } SMqRspWrapper;
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
int32_t epoch; int32_t epoch;
SMqCMGetSubEpRsp msg; SMqAskEpRsp msg;
} SMqAskEpRspWrapper; } SMqAskEpRspWrapper;
struct tmq_list_t { struct tmq_list_t {
...@@ -64,13 +71,6 @@ struct tmq_conf_t { ...@@ -64,13 +71,6 @@ struct tmq_conf_t {
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
}; };
typedef struct {
int8_t inited;
tmr_h timer;
} SMqMgmt;
static SMqMgmt tmqMgmt = {0};
struct tmq_t { struct tmq_t {
// conf // conf
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
...@@ -164,7 +164,7 @@ typedef struct { ...@@ -164,7 +164,7 @@ typedef struct {
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int32_t code; int32_t code;
int32_t sync; int32_t async;
tsem_t rspSem; tsem_t rspSem;
} SMqAskEpCbParam; } SMqAskEpCbParam;
...@@ -188,6 +188,7 @@ typedef struct { ...@@ -188,6 +188,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->autoCommit = false; conf->autoCommit = false;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf; return conf;
} }
...@@ -324,7 +325,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { ...@@ -324,7 +325,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
if (pTaskType == NULL) break; if (pTaskType == NULL) break;
if (*pTaskType == TMQ_DELAYED_TASK__HB) { if (*pTaskType == TMQ_DELAYED_TASK__HB) {
tmqAskEp(tmq, false); tmqAskEp(tmq, true);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit(tmq, NULL, true); tmq_commit(tmq, NULL, true);
...@@ -472,8 +473,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -472,8 +473,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
/*pTmq->autoCommit = conf->autoCommit;*/ pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommit = 0;
pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
...@@ -662,8 +662,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -662,8 +662,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (code != 0) goto FAIL; if (code != 0) goto FAIL;
// TODO: add max retry cnt // TODO: add max retry cnt
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, true)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
tscDebug("not ready, retry\n"); tscDebug("not ready, retry");
taosMsleep(500); taosMsleep(500);
} }
...@@ -854,7 +854,7 @@ CREATE_MSG_FAIL: ...@@ -854,7 +854,7 @@ CREATE_MSG_FAIL:
return -1; return -1;
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/ /*printf("call update ep %d\n", epoch);*/
bool set = false; bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
...@@ -936,7 +936,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -936,7 +936,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
pParam->code = code; pParam->code = code;
if (code != 0) { if (code != 0) {
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync); tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
goto END; goto END;
} }
...@@ -950,15 +950,15 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -950,15 +950,15 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto END; goto END;
} }
if (pParam->sync) { if (!pParam->async) {
SMqCMGetSubEpRsp rsp; SMqAskEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if (tmqUpdateEp(tmq, head->epoch, &rsp)) { if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
} }
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} else { } else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
if (pWrapper == NULL) { if (pWrapper == NULL) {
...@@ -969,7 +969,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -969,7 +969,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch; pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(tmq->mqueue, pWrapper); taosWriteQitem(tmq->mqueue, pWrapper);
/*tsem_post(&tmq->rspSem);*/ /*tsem_post(&tmq->rspSem);*/
...@@ -978,13 +978,13 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -978,13 +978,13 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
END: END:
/*atomic_store_8(&tmq->epStatus, 0);*/ /*atomic_store_8(&tmq->epStatus, 0);*/
if (pParam->sync) { if (!pParam->async) {
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
} }
return code; return code;
} }
int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = 0; int32_t code = 0;
#if 0 #if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
...@@ -995,8 +995,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { ...@@ -995,8 +995,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
} }
atomic_store_32(&tmq->epSkipCnt, 0); atomic_store_32(&tmq->epSkipCnt, 0);
#endif #endif
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqAskEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); SMqAskEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) { if (req == NULL) {
tscError("failed to malloc get subscribe ep buf"); tscError("failed to malloc get subscribe ep buf");
/*atomic_store_8(&tmq->epStatus, 0);*/ /*atomic_store_8(&tmq->epStatus, 0);*/
...@@ -1014,7 +1014,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { ...@@ -1014,7 +1014,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
return -1; return -1;
} }
pParam->tmq = tmq; pParam->tmq = tmq;
pParam->sync = sync; pParam->async = async;
tsem_init(&pParam->rspSem, 0, 0); tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
...@@ -1036,7 +1036,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { ...@@ -1036,7 +1036,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb; sendInfo->fp = tmqAskEpCb;
sendInfo->msgType = TDMT_MND_GET_SUB_EP; sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
...@@ -1045,7 +1045,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { ...@@ -1045,7 +1045,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (sync) { if (!async) {
tsem_wait(&pParam->rspSem); tsem_wait(&pParam->rspSem);
code = pParam->code; code = pParam->code;
taosMemoryFree(pParam); taosMemoryFree(pParam);
...@@ -1209,7 +1209,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) ...@@ -1209,7 +1209,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqCMGetSubEpRsp* rspMsg = &pEpRspWrapper->msg; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/ /*tmqClearUnhandleMsg(tmq);*/
*pReset = true; *pReset = true;
...@@ -1271,15 +1271,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1271,15 +1271,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
SMqRspObj* rspObj; SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
// TODO: put into delayed queue
#if 0
int8_t status = atomic_load_8(&tmq->status);
while (0 != tmqAskEp(tmq, status != TMQ_CONSUMER_STATUS__READY)) {
tscDebug("not ready, retry\n");
taosSsleep(1);
}
#endif
rspObj = tmqHandleAllRsp(tmq, blocking_time, false); rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspObj) { if (rspObj) {
return (TAOS_RES*)rspObj; return (TAOS_RES*)rspObj;
......
...@@ -211,7 +211,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { ...@@ -211,7 +211,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
......
...@@ -440,14 +440,12 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) ...@@ -440,14 +440,12 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset)
} }
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;
// TODO: use subDbUid
int64_t dbUid; int64_t dbUid;
int64_t subDbUid;
int32_t version; int32_t version;
int8_t subType; // db or table int8_t subType; // db or table
int8_t withTbName; int8_t withTbName;
......
...@@ -59,7 +59,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { ...@@ -59,7 +59,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
...@@ -86,7 +86,7 @@ static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) { ...@@ -86,7 +86,7 @@ static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
FAIL: FAIL:
// TODO delete consumer tDeleteSMqConsumerObj(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
...@@ -197,11 +197,11 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -197,11 +197,11 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
} }
static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode; SMnode *pMnode = pMsg->pNode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp = {0}; SMqAskEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId); int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch); int32_t epoch = ntohl(pReq->epoch);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
...@@ -300,7 +300,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { ...@@ -300,7 +300,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} }
// encode rsp // encode rsp
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -311,10 +311,10 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { ...@@ -311,10 +311,10 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tEncodeSMqAskEpRsp(&abuf, &rsp);
// release consumer and free memory // release consumer and free memory
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
// send rsp // send rsp
...@@ -322,7 +322,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { ...@@ -322,7 +322,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
pMsg->rspLen = tlen; pMsg->rspLen = tlen;
return 0; return 0;
FAIL: FAIL:
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
return -1; return -1;
} }
......
...@@ -215,7 +215,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { ...@@ -215,7 +215,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
if (pSubNew == NULL) return NULL; if (pSubNew == NULL) return NULL;
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock); taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = -1; pSubNew->vgNum = 0;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set free fp // TODO set free fp
SMqConsumerEpInSub epInSub = { SMqConsumerEpInSub epInSub = {
......
...@@ -41,29 +41,31 @@ static const SPerfsTableSchema queriesSchema[] = { ...@@ -41,29 +41,31 @@ static const SPerfsTableSchema queriesSchema[] = {
static const SPerfsTableSchema topicSchema[] = { static const SPerfsTableSchema topicSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
/*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/ {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
/*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/ // TODO config
}; };
static const SPerfsTableSchema consumerSchema[] = { static const SPerfsTableSchema consumerSchema[] = {
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
// ep {.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
// up time {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
// topics {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
}; };
static const SPerfsTableSchema subscribeSchema[] = { static const SPerfsTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "committed_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "current_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
}; };
static const SPerfsTableMeta perfsMeta[] = { static const SPerfsTableMeta perfsMeta[] = {
...@@ -71,7 +73,7 @@ static const SPerfsTableMeta perfsMeta[] = { ...@@ -71,7 +73,7 @@ static const SPerfsTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)}, {TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)}, {TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)}, {TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
{TSDB_PERFS_TABLE_SUBSCRIBES, subscribeSchema, tListLen(subscribeSchema)}, {TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
}; };
// connection/application/ // connection/application/
......
...@@ -478,6 +478,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -478,6 +478,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
SVgObj* pVgroup = NULL; SVgObj* pVgroup = NULL;
SQueryPlan* pPlan = NULL; SQueryPlan* pPlan = NULL;
SSubplan* plan = NULL; SSubplan* plan = NULL;
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) { if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
pPlan = qStringToQueryPlan(pTopic->physicalPlan); pPlan = qStringToQueryPlan(pTopic->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
...@@ -485,10 +486,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -485,10 +486,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return -1; return -1;
} }
ASSERT(pSub->vgNum == -1);
pSub->vgNum = 0;
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) { if (levelNum != 1) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
...@@ -529,7 +526,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -529,7 +526,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
pVgEp->vgId = pVgroup->vgId; pVgEp->vgId = pVgroup->vgId;
taosArrayPush(pEpInSub->vgs, &pVgEp); taosArrayPush(pEpInSub->vgs, &pVgEp);
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId); mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) { if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
int32_t msgLen; int32_t msgLen;
......
...@@ -76,7 +76,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -76,7 +76,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->subDbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
...@@ -139,7 +138,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -139,7 +138,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->subDbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
...@@ -520,29 +518,33 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -520,29 +518,33 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
int32_t cols = 0; SColumnInfoData *pColInfo;
SName n;
int32_t cols = 0;
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
SName n;
tNameFromString(&n, pTopic->name, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&n, varDataVal(topicName)); tNameGetDbName(&n, varDataVal(topicName));
varDataSetLen(topicName, strlen(varDataVal(topicName))); varDataSetLen(topicName, strlen(varDataVal(topicName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&n, varDataVal(dbName));
varDataSetLen(dbName, strlen(varDataVal(dbName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN); tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)sql, false); colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
// taosMemoryFree(sql);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
......
...@@ -251,11 +251,14 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { ...@@ -251,11 +251,14 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen) { int32_t bodyLen) {
if (pWal == NULL) return -1;
int code = 0; int code = 0;
// no wal // no wal
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0; if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
if (index == pWal->vers.lastVer + 1) { if (index == pWal->vers.lastVer + 1) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
......
...@@ -270,8 +270,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exist ...@@ -270,8 +270,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exist
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
// mnode-topic // mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported") TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
// mnode-sma // mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册