未验证 提交 b773eaf2 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10186 from taosdata/feature/tq

rewrite rebalance
...@@ -1127,10 +1127,14 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq ...@@ -1127,10 +1127,14 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
return buf; return buf;
} }
typedef struct SMqTmrMsg { typedef struct {
int32_t reserved; int32_t reserved;
} SMqTmrMsg; } SMqTmrMsg;
typedef struct {
int64_t consumerId;
} SMqDoRebalanceMsg;
typedef struct { typedef struct {
int64_t status; int64_t status;
} SMVSubscribeRsp; } SMVSubscribeRsp;
...@@ -1707,13 +1711,13 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) ...@@ -1707,13 +1711,13 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
return buf; return buf;
} }
typedef struct SMqTbData { typedef struct {
int64_t uid; int64_t uid;
int32_t numOfRows; int32_t numOfRows;
char* colData; char* colData;
} SMqTbData; } SMqTbData;
typedef struct SMqTopicBlk { typedef struct {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int64_t committedOffset; int64_t committedOffset;
int64_t reqOffset; int64_t reqOffset;
...@@ -1724,7 +1728,7 @@ typedef struct SMqTopicBlk { ...@@ -1724,7 +1728,7 @@ typedef struct SMqTopicBlk {
SMqTbData* tbData; SMqTbData* tbData;
} SMqTopicData; } SMqTopicData;
typedef struct SMqConsumeRsp { typedef struct {
int64_t consumerId; int64_t consumerId;
SSchemaWrapper* schemas; SSchemaWrapper* schemas;
int64_t committedOffset; int64_t committedOffset;
...@@ -1736,7 +1740,7 @@ typedef struct SMqConsumeRsp { ...@@ -1736,7 +1740,7 @@ typedef struct SMqConsumeRsp {
} SMqConsumeRsp; } SMqConsumeRsp;
// one req for one vg+topic // one req for one vg+topic
typedef struct SMqConsumeReq { typedef struct {
SMsgHead head; SMsgHead head;
//0: commit only, current offset //0: commit only, current offset
//1: consume only, poll next offset //1: consume only, poll next offset
...@@ -1752,17 +1756,17 @@ typedef struct SMqConsumeReq { ...@@ -1752,17 +1756,17 @@ typedef struct SMqConsumeReq {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq; } SMqConsumeReq;
typedef struct SMqSubVgEp { typedef struct {
int32_t vgId; int32_t vgId;
SEpSet epSet; SEpSet epSet;
} SMqSubVgEp; } SMqSubVgEp;
typedef struct SMqSubTopicEp { typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp> SArray* vgs; // SArray<SMqSubVgEp>
} SMqSubTopicEp; } SMqSubTopicEp;
typedef struct SMqCMGetSubEpRsp { typedef struct {
int64_t consumerId; int64_t consumerId;
int64_t epoch; int64_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
......
...@@ -141,7 +141,8 @@ enum { ...@@ -141,7 +141,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMqTmrMsg, SMqTmrMsg)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
// Requests handled by VNODE // Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
...@@ -27,6 +27,7 @@ typedef struct SMnodeMsg SMnodeMsg; ...@@ -27,6 +27,7 @@ typedef struct SMnodeMsg SMnodeMsg;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct SMnodeLoad { typedef struct SMnodeLoad {
...@@ -64,6 +65,7 @@ typedef struct { ...@@ -64,6 +65,7 @@ typedef struct {
SMnodeCfg cfg; SMnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
PutReqToMWriteQFp putReqToMWriteQFp; PutReqToMWriteQFp putReqToMWriteQFp;
PutReqToMReadQFp putReqToMReadQFp;
SendReqToDnodeFp sendReqToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp; SendRedirectRspFp sendRedirectRspFp;
......
...@@ -209,6 +209,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -209,6 +209,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SName name = {0}; SName name = {0};
char* dbName = getDbOfConnection(tmq->pTscObj); char* dbName = getDbOfConnection(tmq->pTscObj);
if (dbName == NULL) {
return TMQ_RESP_ERR__FAIL;
}
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
tNameFromString(&name, topicName, T_NAME_TABLE); tNameFromString(&name, topicName, T_NAME_TABLE);
......
...@@ -565,7 +565,6 @@ TEST(testCase, insert_test) { ...@@ -565,7 +565,6 @@ TEST(testCase, insert_test) {
#endif #endif
#if 0
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -585,7 +584,7 @@ TEST(testCase, projection_query_tables) { ...@@ -585,7 +584,7 @@ TEST(testCase, projection_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 100000; ++i) { for(int32_t i = 0; i < 10000000; ++i) {
char sql[512] = {0}; char sql[512] = {0};
sprintf(sql, "insert into tu values(now+%da, %d)", i, i); sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
TAOS_RES* p = taos_query(pConn, sql); TAOS_RES* p = taos_query(pConn, sql);
...@@ -616,6 +615,7 @@ TEST(testCase, projection_query_tables) { ...@@ -616,6 +615,7 @@ TEST(testCase, projection_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#if 0
TEST(testCase, projection_query_stables) { TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......
...@@ -46,7 +46,6 @@ TEST(testCase, create_topic_ctb_Test) { ...@@ -46,7 +46,6 @@ TEST(testCase, create_topic_ctb_Test) {
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
} }
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);
......
...@@ -256,6 +256,12 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { ...@@ -256,6 +256,12 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) {
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg);
return 0;
}
static int32_t dndPutMsgToMReadQ(SDnode *pDnode, SRpcMsg* pRpcMsg) {
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg);
return 0;
} }
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
...@@ -264,6 +270,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -264,6 +270,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ; pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ;
pOption->putReqToMReadQFp = dndPutMsgToMReadQ;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->env.sver; pOption->cfg.sver = pDnode->env.sver;
......
...@@ -28,6 +28,8 @@ void mndCleanupConsumer(SMnode *pMnode); ...@@ -28,6 +28,8 @@ void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
......
...@@ -343,42 +343,22 @@ typedef struct { ...@@ -343,42 +343,22 @@ typedef struct {
char payload[]; char payload[];
} SShowObj; } SShowObj;
#if 0 typedef struct {
typedef struct SConsumerObj {
uint64_t uid;
int64_t createTime;
int64_t updateTime;
//uint64_t dbUid;
int32_t version;
SRWLatch lock;
SArray* topics;
} SConsumerObj;
typedef struct SMqTopicConsumer {
int64_t consumerId;
SList* topicList;
} SMqTopicConsumer;
#endif
typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned int32_t vgId; // -1 for unassigned
int32_t status; int32_t status;
SEpSet epSet; SEpSet epSet;
int64_t oldConsumerId;
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
char* qmsg; char* qmsg;
} SMqConsumerEp; } SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastConsumerHbTs);
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastVgHbTs);
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen += taosEncodeString(buf, pConsumerEp->qmsg); tlen += taosEncodeString(buf, pConsumerEp->qmsg);
return tlen; return tlen;
} }
...@@ -387,10 +367,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu ...@@ -387,10 +367,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastConsumerHbTs);
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastVgHbTs);
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
buf = taosDecodeString(buf, &pConsumerEp->qmsg); buf = taosDecodeString(buf, &pConsumerEp->qmsg);
return buf; return buf;
} }
...@@ -401,97 +379,89 @@ static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) { ...@@ -401,97 +379,89 @@ static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
} }
} }
// unit for rebalance typedef struct {
typedef struct SMqSubscribeObj { int64_t consumerId;
SArray* vgInfo; // SArray<SMqConsumerEp>
} SMqSubConsumer;
static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsumer* pConsumer) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubConsumer(void** buf, SMqSubConsumer* pConsumer) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->vgInfo = taosArrayInit(sz, sizeof(SMqConsumerEp));
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp consumerEp;
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
taosArrayPush(pConsumer->vgInfo, &consumerEp);
}
return buf;
}
static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) {
if (pSubConsumer->vgInfo) {
taosArrayDestroyEx(pSubConsumer->vgInfo, (void (*)(void*))tDeleteSMqConsumerEp);
pSubConsumer->vgInfo = NULL;
}
}
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t epoch; int32_t status;
// TODO: replace with priority queue int32_t vgNum;
int32_t nextConsumerIdx; SArray* consumers; // SArray<SMqSubConsumer>
SArray* availConsumer; // SArray<int64_t> (consumerId)
SArray* assigned; // SArray<SMqConsumerEp>
SArray* idleConsumer; // SArray<SMqConsumerEp>
SArray* lostConsumer; // SArray<SMqConsumerEp>
SArray* unassignedVg; // SArray<SMqConsumerEp> SArray* unassignedVg; // SArray<SMqConsumerEp>
} SMqSubscribeObj; } SMqSubscribeObj;
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); SMqSubscribeObj* pSub = calloc(1, sizeof(SMqSubscribeObj));
if (pSub == NULL) { if (pSub == NULL) {
return NULL; return NULL;
} }
pSub->key[0] = 0; pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer));
pSub->epoch = 0; if (pSub->consumers == NULL) {
goto _err;
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
if (pSub->availConsumer == NULL) {
free(pSub);
return NULL;
}
pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
free(pSub);
return NULL;
}
pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->lostConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
free(pSub);
return NULL;
}
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
free(pSub);
return NULL;
} }
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->availConsumer); goto _err;
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
taosArrayDestroy(pSub->idleConsumer);
free(pSub);
return NULL;
} }
pSub->key[0] = 0;
pSub->vgNum = 0;
pSub->status = 0;
return pSub; return pSub;
_err:
tfree(pSub->unassignedVg);
tfree(pSub->consumers);
tfree(pSub);
return NULL;
} }
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key); tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI32(buf, pSub->epoch); tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI32(buf, pSub->status);
int32_t sz; int32_t sz;
sz = taosArrayGetSize(pSub->availConsumer); sz = taosArrayGetSize(pSub->consumers);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i); SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->consumers, i);
tlen += taosEncodeFixedI64(buf, *pConsumerId); tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
}
sz = taosArrayGetSize(pSub->assigned);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
sz = taosArrayGetSize(pSub->lostConsumer);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->lostConsumer, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
sz = taosArrayGetSize(pSub->idleConsumer);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->idleConsumer, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
} }
sz = taosArrayGetSize(pSub->unassignedVg); sz = taosArrayGetSize(pSub->unassignedVg);
...@@ -506,68 +476,25 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb ...@@ -506,68 +476,25 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) { static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) {
buf = taosDecodeStringTo(buf, pSub->key); buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI32(buf, &pSub->epoch); buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI32(buf, &pSub->status);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->availConsumer = taosArrayInit(sz, sizeof(int64_t)); pSub->consumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
if (pSub->availConsumer == NULL) { if (pSub->consumers == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
int64_t consumerId;
buf = taosDecodeFixedI64(buf, &consumerId);
taosArrayPush(pSub->availConsumer, &consumerId);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->assigned = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->assigned, &cEp);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->lostConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->lostConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp = {0}; SMqSubConsumer subConsumer = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqSubConsumer(buf, &subConsumer);
taosArrayPush(pSub->lostConsumer, &cEp); taosArrayPush(pSub->consumers, &subConsumer);
} }
buf = taosDecodeFixedI32(buf, &sz);
pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->idleConsumer, &cEp);
}
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
taosArrayDestroy(pSub->idleConsumer);
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -575,50 +502,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -575,50 +502,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedVg, &cEp); taosArrayPush(pSub->unassignedVg, &cEp);
} }
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->availConsumer) { if (pSub->consumers) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
pSub->availConsumer = NULL; //taosArrayDestroy(pSub->consumers);
} pSub->consumers = NULL;
if (pSub->assigned) {
//taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->assigned);
pSub->assigned = NULL;
} }
if (pSub->unassignedVg) { if (pSub->unassignedVg) {
//taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->unassignedVg); //taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL; pSub->unassignedVg = NULL;
} }
if (pSub->idleConsumer) {
//taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->idleConsumer);
pSub->idleConsumer = NULL;
}
if (pSub->lostConsumer) {
//taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->lostConsumer);
pSub->lostConsumer = NULL;
}
} }
typedef struct SMqCGroup { typedef struct {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
SList* consumerIds; // SList<int64_t>
SList* idleVGroups; // SList<int32_t>
} SMqCGroup;
typedef struct SMqTopicObj {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; int64_t uid;
int64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
...@@ -626,79 +532,23 @@ typedef struct SMqTopicObj { ...@@ -626,79 +532,23 @@ typedef struct SMqTopicObj {
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
// SHashObj *cgroups; // SHashObj<SMqCGroup>
// SHashObj *consumers; // SHashObj<SMqConsumerObj>
} SMqTopicObj; } SMqTopicObj;
// TODO: add cache and change name to id typedef struct {
typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN];
int32_t epoch;
// vg assigned to the consumer on the topic
SArray* pVgInfo; // SArray<int32_t>
} SMqConsumerTopic;
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic,
SMqSubscribeObj* pSub, int64_t* oldConsumerId) {
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
if (pCTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
strcpy(pCTopic->name, pTopic->name);
pCTopic->epoch = 0;
pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t));
int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg);
if (unassignedVgSz > 0) {
SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg);
*oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId);
taosArrayPush(pSub->assigned, pCEp);
}
return pCTopic;
}
static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pConsumerTopic->name);
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
int32_t sz = 0;
if (pConsumerTopic->pVgInfo != NULL) {
sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
}
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);
tlen += taosEncodeFixedI32(buf, *pVgInfo);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) {
buf = taosDecodeStringTo(buf, pConsumerTopic->name);
buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic));
for (int32_t i = 0; i < sz; i++) {
int32_t vgInfo;
buf = taosDecodeFixedI32(buf, &vgInfo);
taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo);
}
return buf;
}
typedef struct SMqConsumerObj {
int64_t consumerId; int64_t consumerId;
int64_t connId; int64_t connId;
SRWLatch lock; SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqConsumerTopic> SArray* topics; // SArray<char*>
int64_t epoch; int64_t epoch;
// stat // stat
int64_t pollCnt; int64_t pollCnt;
// status
int32_t status;
// heartbeat from the consumer reset hbStatus to 0
// each checkConsumerAlive msg add hbStatus by 1
// if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost
int32_t hbStatus;
} SMqConsumerObj; } SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
...@@ -711,88 +561,29 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO ...@@ -711,88 +561,29 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
int32_t sz = taosArrayGetSize(pConsumer->topics); int32_t sz = taosArrayGetSize(pConsumer->topics);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i); char* topic = taosArrayGetP(pConsumer->topics, i);
tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic); tlen += taosEncodeString(buf, topic);
} }
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) { static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumer->connId); buf = taosDecodeFixedI64(buf, &pConsumer->connId);
buf = taosDecodeFixedI64(buf, &pConsumer->epoch); buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt); buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
buf = taosDecodeStringTo(buf, pConsumer->cgroup); buf = taosDecodeStringTo(buf, pConsumer->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj)); pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerTopic cTopic; char* topic;
buf = tDecodeSMqConsumerTopic(buf, &cTopic); buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->topics, &cTopic); taosArrayPush(pConsumer->topics, &topic);
} }
return buf; return buf;
} }
typedef struct SMqSubConsumerObj {
int64_t consumerUid; // if -1, unassigned
SList* vgId; // SList<int32_t>
} SMqSubConsumerObj;
typedef struct SMqSubCGroupObj {
char name[TSDB_CONSUMER_GROUP_LEN];
SList* consumers; // SList<SMqConsumerObj>
} SMqSubCGroupObj;
typedef struct SMqSubTopicObj {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t sqlLen;
char* sql;
char* logicalPlan;
char* physicalPlan;
SList* cgroups; // SList<SMqSubCGroupObj>
} SMqSubTopicObj;
typedef struct SMqConsumerSubObj {
int64_t topicUid;
SList* vgIds; // SList<int64_t>
} SMqConsumerSubObj;
typedef struct SMqConsumerHbObj {
int64_t consumerId;
SList* consumerSubs; // SList<SMqConsumerSubObj>
} SMqConsumerHbObj;
typedef struct SMqVGroupSubObj {
int64_t topicUid;
SList* consumerIds; // SList<int64_t>
} SMqVGroupSubObj;
typedef struct SMqVGroupHbObj {
int64_t vgId;
SList* vgSubs; // SList<SMqVGroupSubObj>
} SMqVGroupHbObj;
#if 0
typedef struct SCGroupObj {
char name[TSDB_TOPIC_NAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
//uint64_t dbUid;
int32_t version;
SRWLatch lock;
SList* consumerIds;
} SCGroupObj;
#endif
typedef struct SMnodeMsg { typedef struct SMnodeMsg {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
......
...@@ -96,6 +96,7 @@ typedef struct SMnode { ...@@ -96,6 +96,7 @@ typedef struct SMnode {
SendReqToMnodeFp sendReqToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp; SendRedirectRspFp sendRedirectRspFp;
PutReqToMWriteQFp putReqToMWriteQFp; PutReqToMWriteQFp putReqToMWriteQFp;
PutReqToMReadQFp putReqToMReadQFp;
} SMnode; } SMnode;
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
......
...@@ -53,6 +53,19 @@ int32_t mndInitConsumer(SMnode *pMnode) { ...@@ -53,6 +53,19 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {} void mndCleanupConsumer(SMnode *pMnode) {}
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
SMqConsumerObj* pConsumer = malloc(sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
strcpy(pConsumer->cgroup, cgroup);
taosInitRWLatch(&pConsumer->lock);
return pConsumer;
}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL; void* buf = NULL;
...@@ -164,148 +177,3 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { ...@@ -164,148 +177,3 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
} }
#if 0
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
if (pConsumer == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pConsumer->numOfTags);
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pConsumer->version);
pMeta->tuid = htonl(pConsumer->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pConsumer->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
return 0;
}
static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfConsumers = 0;
void *pIter = NULL;
while (1) {
SMqConsumerObj *pConsumer = NULL;
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
numOfConsumers++;
sdbRelease(pSdb, pConsumer);
}
*pNumOfConsumers = numOfConsumers;
return 0;
}
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htonl(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
#endif
...@@ -31,9 +31,20 @@ ...@@ -31,9 +31,20 @@
#define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_MS 5000 #define MND_SUBSCRIBE_REBALANCE_CNT 3
static char *mndMakeSubscribeKey(char *cgroup, char *topicName); enum {
MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__ACTIVE,
MQ_CONSUMER_STATUS__LOST,
};
enum {
MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
MQ_SUBSCRIBE_STATUS__DELETED,
};
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName);
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
...@@ -48,9 +59,10 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); ...@@ -48,9 +59,10 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub, const SMqConsumerEp *pSub);
int64_t oldConsumerId);
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE, SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
...@@ -68,12 +80,140 @@ int32_t mndInitSubscribe(SMnode *pMnode) { ...@@ -68,12 +80,140 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) {
SMqSubscribeObj *pSub = tNewSubscribeObj();
if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name);
if (key == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSMqSubscribeObj(pSub);
free(pSub);
return NULL;
}
strcpy(pSub->key, key);
free(key);
if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSMqSubscribeObj(pSub);
free(pSub);
return NULL;
}
// TODO: disable alter subscribed table
return pSub;
}
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicObj *pTopic,
const SMqConsumerEp *pConsumerEp, const char *cgroup) {
SMqSetCVgReq req = {0};
strcpy(req.cgroup, cgroup);
strcpy(req.topicName, pTopic->name);
req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pConsumerEp->qmsg;
req.oldConsumerId = pConsumerEp->oldConsumerId;
req.newConsumerId = pConsumerEp->consumerId;
req.vgId = pConsumerEp->vgId;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pConsumerEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic,
const SMqConsumerEp *pConsumerEp, const char *cgroup) {
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf;
int32_t tlen;
if (mndBuildRebalanceMsg(&buf, &tlen, pTopic, pConsumerEp, cgroup) < 0) {
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(buf);
return -1;
}
return 0;
}
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
SMqSetCVgReq req = {0};
req.oldConsumerId = pConsumerEp->consumerId;
req.newConsumerId = -1;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pConsumerEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf;
int32_t tlen;
if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(buf);
return -1;
}
return 0;
}
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp = {0}; SMqCMGetSubEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId); int64_t consumerId = be64toh(pReq->consumerId);
int64_t currentTs = taosGetTimestampMs();
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
...@@ -85,49 +225,35 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -85,49 +225,35 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy(rsp.cgroup, pReq->cgroup); strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId; rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch; rsp.epoch = pConsumer->epoch;
if (pReq->epoch != rsp.epoch) {
SArray *pTopics = pConsumer->topics; SArray *pTopics = pConsumer->topics;
int32_t sz = taosArrayGetSize(pTopics); int sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
for (int32_t i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SMqSubTopicEp topicEp; char *topicName = taosArrayGetP(pTopics, i);
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
strcpy(topicEp.topic, pConsumerTopic->name);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name);
ASSERT(pSub); ASSERT(pSub);
bool found = 0; int csz = taosArrayGetSize(pSub->consumers);
bool changed = 0; //TODO: change to bsearch
for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { for (int j = 0; j < csz; j++) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
found = 1; if (consumerId == pSubConsumer->consumerId) {
break; int vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
} SMqSubTopicEp topicEp;
} topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
if (found == 0) { for (int k = 0; k < vgsz; k++) {
taosArrayPush(pSub->availConsumer, &consumerId); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
}
int32_t assignedSz = taosArrayGetSize(pSub->assigned); SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId};
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
for (int32_t j = 0; j < assignedSz; j++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j);
if (pCEp->consumerId == consumerId) {
pCEp->lastConsumerHbTs = currentTs;
SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId};
taosArrayPush(topicEp.vgs, &vgEp); taosArrayPush(topicEp.vgs, &vgEp);
changed = 1;
}
} }
if (taosArrayGetSize(topicEp.vgs) != 0) {
taosArrayPush(rsp.topics, &topicEp); taosArrayPush(rsp.topics, &topicEp);
break;
} }
if (changed || found) {
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWrite(pMnode->pSdb, pRaw);
} }
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
}
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
...@@ -157,17 +283,131 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { ...@@ -157,17 +283,131 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SMqSubscribeObj *pSub = NULL; SMqConsumerObj *pConsumer;
void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); void *pIter = NULL;
int64_t currentTs = taosGetTimestampMs(); while (1) {
int32_t sz; pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
while (pIter != NULL) { if (pIter == NULL) break;
int32_t hbStatus = atomic_fetch_add_32(&pConsumer->hbStatus, 1);
if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
int32_t old =
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
if (old == MQ_CONSUMER_STATUS__ACTIVE) {
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
pRebMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
}
}
}
return 0;
}
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pReq->consumerId);
int topicSz = taosArrayGetSize(pConsumer->topics);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int i = 0; i < topicSz; i++) {
char *topic = taosArrayGetP(pConsumer->topics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
int32_t consumerNum = taosArrayGetSize(pSub->consumers);
if (consumerNum != 0) {
int32_t vgNum = pSub->vgNum;
int32_t vgEachConsumer = vgNum / consumerNum;
int32_t left = vgNum % consumerNum;
int32_t leftUsed = 0;
SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));
SArray *unassignedConsumer = taosArrayInit(0, sizeof(int32_t));
for (int32_t j = 0; j < consumerNum; j++) {
bool changed = false;
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
int32_t vgOneConsumer = taosArrayGetSize(pSubConsumer->vgInfo);
bool canUseLeft = leftUsed < left;
if (vgOneConsumer > vgEachConsumer + canUseLeft) {
changed = true;
if (canUseLeft) leftUsed++;
// put into unassigned
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgEachConsumer + canUseLeft) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL);
taosArrayPush(unassignedVgStash, pConsumerEp);
// build msg and persist into trans
}
} else if (vgOneConsumer < vgEachConsumer) {
changed = true;
// assign from unassigned
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) {
// if no unassgined, save j
if (taosArrayGetSize(unassignedVgStash) == 0) {
taosArrayPush(unassignedConsumer, &j);
break;
}
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
}
if (changed) {
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
pRebConsumer->epoch++;
SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
}
}
for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumer); j++) {
int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumer, j);
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) {
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
}
ASSERT(taosArrayGetSize(unassignedVgStash) == 0);
// send msg to vnode
// log rebalance statistics
/*SSdbRaw *pSubRaw = mndSubscribeActionEncode(pSub);*/
/*sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);*/
/*mndTransAppendRedolog(pTrans, pSubRaw);*/
}
mndReleaseSubscribe(pMnode, pSub);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return 0;
}
#if 0
//update consumer status for the subscribption
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId; int64_t consumerId = pCEp->consumerId;
if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { if (pCEp->status != -1) {
int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1);
if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) {
continue;
}
// put consumer into lostConsumer // put consumer into lostConsumer
taosArrayPush(pSub->lostConsumer, pCEp); SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp);
lostConsumer->qmsg = NULL;
// put vg into unassigned // put vg into unassigned
taosArrayPush(pSub->unassignedVg, pCEp); taosArrayPush(pSub->unassignedVg, pCEp);
// remove from assigned // remove from assigned
...@@ -192,91 +432,76 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -192,91 +432,76 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
#endif #endif
} }
} }
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) { // no available consumer, skip rebalance
if (taosArrayGetSize(pSub->availConsumer) == 0) {
continue;
}
taosArrayGet(pSub->availConsumer, 0);
// rebalance condition1 : have unassigned vg
// assign vg to a consumer, trying to find the least assigned one
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
char *topic = NULL; char *topic = NULL;
char *cgroup = NULL; char *cgroup = NULL;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup); mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
// create trans
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx); int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
int64_t oldConsumerId = pCEp->consumerId; pCEp->oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId; pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp); taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++; pConsumer->epoch++;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
/*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/ sdbWrite(pMnode->pSdb, pConsumerRaw);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
// build msg void* msg;
int32_t msgLen;
SMqSetCVgReq req = {0}; mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic);
strcpy(req.cgroup, cgroup);
strcpy(req.topicName, topic);
req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pCEp->qmsg;
req.oldConsumerId = oldConsumerId;
req.newConsumerId = consumerId;
req.vgId = pCEp->vgId;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pCEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req);
// persist msg // persist msg
// TODO: no need for txn
STransAction action = {0}; STransAction action = {0};
action.epSet = pCEp->epSet; action.epSet = pCEp->epSet;
action.pCont = buf; action.pCont = msg;
action.contLen = sizeof(SMsgHead) + tlen; action.contLen = sizeof(SMsgHead) + msgLen;
action.msgType = TDMT_VND_MQ_SET_CONN; action.msgType = TDMT_VND_MQ_SET_CONN;
mndTransAppendRedoAction(pTrans, &action); mndTransAppendRedoAction(pTrans, &action);
// persist raw // persist data
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw); mndTransAppendRedolog(pTrans, pRaw);
tfree(topic);
tfree(cgroup);
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
} }
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans); mndTransDrop(pTrans);
tfree(topic);
tfree(cgroup);
} }
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); // rebalance condition2 : imbalance assignment
} }
return 0; return 0;
} }
#endif
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
// convert phyplan to dag SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray = NULL; SArray *pArray = NULL;
SArray *inner = taosArrayGet(pDag->pSubplans, 0); SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0); SSubplan *plan = taosArrayGetP(inner, 0);
SSdb *pSdb = pMnode->pSdb; SArray *unassignedVg = pSub->unassignedVg;
SVgObj *pVgroup = NULL;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
...@@ -284,6 +509,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -284,6 +509,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) continue; if (pVgroup->dbUid != pTopic->dbUid) continue;
pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup); plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
...@@ -298,47 +524,41 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -298,47 +524,41 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
return -1; return -1;
} }
SMqConsumerEp CEp = {0}; SMqConsumerEp consumerEp = {0};
CEp.status = 0; consumerEp.status = 0;
CEp.consumerId = -1; consumerEp.consumerId = -1;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo *pTaskInfo = taosArrayGet(pArray, 0); STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
CEp.epSet = pTaskInfo->addr.epset; consumerEp.epSet = pTaskInfo->addr.epset;
CEp.vgId = pTaskInfo->addr.nodeId; consumerEp.vgId = pTaskInfo->addr.nodeId;
ASSERT(CEp.vgId == pVgroup->vgId); ASSERT(consumerEp.vgId == pVgroup->vgId);
CEp.qmsg = strdup(pTaskInfo->msg->msg); consumerEp.qmsg = strdup(pTaskInfo->msg->msg);
taosArrayPush(unassignedVg, &CEp); taosArrayPush(unassignedVg, &consumerEp);
// TODO: free taskInfo // TODO: free taskInfo
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
} }
/*qDestroyQueryDag(pDag);*/ /*qDestroyQueryDag(pDag);*/
return 0; return 0;
} }
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp, const SMqConsumerEp *pConsumerEp) {
int64_t oldConsumerId) { int32_t vgId = pConsumerEp->vgId;
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = { SMqSetCVgReq req = {
.vgId = vgId, .vgId = vgId,
.oldConsumerId = oldConsumerId, .oldConsumerId = pConsumerEp->oldConsumerId,
.newConsumerId = pConsumer->consumerId, .newConsumerId = pConsumerEp->consumerId,
.sql = pTopic->sql,
.logicalPlan = pTopic->logicalPlan,
.physicalPlan = pTopic->physicalPlan,
.qmsg = pConsumerEp->qmsg,
}; };
strcpy(req.cgroup, pConsumer->cgroup);
strcpy(req.cgroup, cgroup);
strcpy(req.topicName, pTopic->name); strcpy(req.topicName, pTopic->name);
req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pCEp->qmsg;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen); void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
...@@ -365,7 +585,6 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -365,7 +585,6 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
free(buf); free(buf);
return -1; return -1;
} }
}
return 0; return 0;
} }
...@@ -373,7 +592,7 @@ void mndCleanupSubscribe(SMnode *pMnode) {} ...@@ -373,7 +592,7 @@ void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL; void *buf = NULL;
int32_t tlen = tEncodeSubscribeObj(NULL, pSub); int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE; int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
...@@ -408,7 +627,7 @@ SUB_ENCODE_OVER: ...@@ -408,7 +627,7 @@ SUB_ENCODE_OVER:
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL; void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
...@@ -443,7 +662,6 @@ SUB_DECODE_OVER: ...@@ -443,7 +662,6 @@ SUB_DECODE_OVER:
tfree(buf); tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
// TODO free subscribeobj
tfree(pRow); tfree(pRow);
return NULL; return NULL;
} }
...@@ -467,7 +685,7 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc ...@@ -467,7 +685,7 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
return 0; return 0;
} }
static char *mndMakeSubscribeKey(char *cgroup, char *topicName) { static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN); char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
if (key == NULL) { if (key == NULL) {
return NULL; return NULL;
...@@ -501,8 +719,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -501,8 +719,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SCMSubscribeReq subscribe; SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe); tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId; int64_t consumerId = subscribe.consumerId;
char *consumerGroup = subscribe.consumerGroup; char *cgroup = subscribe.consumerGroup;
int32_t cgroupLen = strlen(consumerGroup);
SArray *newSub = subscribe.topicNames; SArray *newSub = subscribe.topicNames;
int newTopicNum = subscribe.topicNum; int newTopicNum = subscribe.topicNum;
...@@ -511,24 +728,18 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -511,24 +728,18 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SArray *oldSub = NULL; SArray *oldSub = NULL;
int oldTopicNum = 0; int oldTopicNum = 0;
bool createConsumer = false;
// create consumer if not exist // create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
// create consumer // create consumer
pConsumer = malloc(sizeof(SMqConsumerObj)); pConsumer = mndCreateConsumer(consumerId, cgroup);
if (pConsumer == NULL) { createConsumer = true;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
strcpy(pConsumer->cgroup, consumerGroup);
taosInitRWLatch(&pConsumer->lock);
} else { } else {
pConsumer->epoch++; pConsumer->epoch++;
oldSub = pConsumer->topics; oldSub = pConsumer->topics;
} }
pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); pConsumer->topics = newSub;
if (oldSub != NULL) { if (oldSub != NULL) {
oldTopicNum = taosArrayGetSize(oldSub); oldTopicNum = taosArrayGetSize(oldSub);
...@@ -546,14 +757,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -546,14 +757,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char *oldTopicName = NULL; char *oldTopicName = NULL;
if (i >= newTopicNum) { if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic // encode unset topic msg to all vnodes related to that topic
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; oldTopicName = taosArrayGetP(oldSub, j);
j++; j++;
} else if (j >= oldTopicNum) { } else if (j >= oldTopicNum) {
newTopicName = taosArrayGetP(newSub, i); newTopicName = taosArrayGetP(newSub, i);
i++; i++;
} else { } else {
newTopicName = taosArrayGetP(newSub, i); newTopicName = taosArrayGetP(newSub, i);
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; oldTopicName = taosArrayGetP(oldSub, j);
int comp = compareLenPrefixedStr(newTopicName, oldTopicName); int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) { if (comp == 0) {
...@@ -572,54 +783,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -572,54 +783,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
} }
if (oldTopicName != NULL) { if (oldTopicName != NULL) {
#if 0 ASSERT(newTopicName == NULL);
// cancel subscribe of that old topic
ASSERT(pNewTopic == NULL); // cancel subscribe of old topic
char *oldTopicName = pOldTopic->name; SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
SList *vgroups = pOldTopic->vgroups; ASSERT(pSub);
SListIter iter; int csz = taosArrayGetSize(pSub->consumers);
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); for (int ci = 0; ci < csz; ci++) {
SListNode *pn; SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
if (pSubConsumer->consumerId == consumerId) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); int vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
ASSERT(pTopic != NULL); for (int vgi = 0; vgi < vgsz; vgi++) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
// acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
STransAction action = {0}; break;
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
} }
} }
// delete data in mnode pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
#endif
} else if (newTopicName != NULL) { } else if (newTopicName != NULL) {
// save subscribe info to mnode
ASSERT(oldTopicName == NULL); ASSERT(oldTopicName == NULL);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
...@@ -628,111 +810,53 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -628,111 +810,53 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
continue; continue;
} }
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
bool create = false; bool createSub = false;
if (pSub == NULL) {
mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
pSub = tNewSubscribeObj();
if (pSub == NULL) { if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
return -1; pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
createSub = true;
} }
char *key = mndMakeSubscribeKey(consumerGroup, newTopicName);
if (key == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pSub->key, key);
free(key);
// set unassigned vg
if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) {
// TODO: free memory
return -1;
}
// TODO: disable alter
create = true;
}
taosArrayPush(pSub->availConsumer, &consumerId);
int64_t oldConsumerId; SMqSubConsumer mqSubConsumer;
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub, &oldConsumerId); mqSubConsumer.consumerId = consumerId;
taosArrayPush(pConsumer->topics, pConsumerTopic); mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
taosArrayPush(pSub->consumers, &mqSubConsumer);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { // if have un assigned vg, assign one to the consumer
ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1); if (taosArrayGetSize(pSub->unassignedVg) > 0) {
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned); pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
if (pCEp->vgId == vgId) { pConsumerEp->consumerId = consumerId;
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp, oldConsumerId) < 0) { taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
// TODO mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
return -1;
}
}
// send setmsg to vnode
} }
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw); mndTransAppendRedolog(pTrans, pRaw);
if (!create) mndReleaseSubscribe(pMnode, pSub);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); if (!createSub) mndReleaseSubscribe(pMnode, pSub);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); mndReleaseTopic(pMnode, pTopic);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
#endif
/*mndReleaseTopic(pMnode, pTopic);*/
/*mndReleaseSubscribe(pMnode, pSub);*/
} }
} }
// part3. persist consumerObj
// destroy old sub if (oldSub) taosArrayDestroyEx(oldSub, free);
if (oldSub) taosArrayDestroy(oldSub);
// put new sub into consumerobj
// persist consumerObj // persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pConsumerRaw); mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
/*mndReleaseConsumer(pMnode, pConsumer);*/ if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return -1; return -1;
} }
if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
/*mndReleaseConsumer(pMnode, pConsumer);*/ if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -741,146 +865,6 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) { ...@@ -741,146 +865,6 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
return 0; return 0;
} }
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("subscribe:%s, start to retrieve meta", pInfo->tbName);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
if (pConsumer == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pConsumer->numOfTags);
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pConsumer->version);
pMeta->tuid = htonl(pConsumer->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pConsumer->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return 0;
}
static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfConsumers = 0;
void *pIter = NULL;
while (1) {
SMqConsumerObj *pConsumer = NULL;
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
numOfConsumers++;
sdbRelease(pSdb, pConsumer);
}
*pNumOfConsumers = numOfConsumers;
return 0;
}
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htonl(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
......
...@@ -76,7 +76,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -76,7 +76,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg)); SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer); taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer);
...@@ -249,6 +249,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -249,6 +249,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode; pMnode->pDnode = pOption->pDnode;
pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp; pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp;
pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp; pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp; pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
......
...@@ -124,7 +124,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh); ...@@ -124,7 +124,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds); int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
......
...@@ -22,7 +22,7 @@ static void tsdbResetReadFile(SReadH *pReadh); ...@@ -22,7 +22,7 @@ static void tsdbResetReadFile(SReadH *pReadh);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols); static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize); int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
int numOfColIds); int numOfColIds);
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
...@@ -271,7 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -271,7 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return 0; return 0;
} }
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) { int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds) {
ASSERT(pBlock->numOfSubBlocks > 0); ASSERT(pBlock->numOfSubBlocks > 0);
int8_t update = pReadh->pRepo->config.update; int8_t update = pReadh->pRepo->config.update;
...@@ -548,7 +548,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 ...@@ -548,7 +548,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
return 0; return 0;
} }
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
int numOfColIds) { int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID); ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册