提交 0ba5427a 编写于 作者: L Liu Jicong

handle subscribe

上级 8938c902
...@@ -159,6 +159,7 @@ enum { ...@@ -159,6 +159,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
......
...@@ -400,6 +400,8 @@ int32_t* taosGetErrno(); ...@@ -400,6 +400,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal")
#define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted")
#define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit") #define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit")
#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003) //"WAL invalid version")
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory")
// tfs // tfs
#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") #define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
......
...@@ -25,11 +25,8 @@ extern "C" { ...@@ -25,11 +25,8 @@ extern "C" {
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode);
SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SCGroupObj *mndAcquireCGroup(SMnode *pMnode, char *consumerGroup);
void mndReleaseCGroup(SMnode *pMnode, SCGroupObj *pCGroup);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -302,6 +302,7 @@ typedef struct { ...@@ -302,6 +302,7 @@ typedef struct {
char payload[]; char payload[];
} SShowObj; } SShowObj;
#if 0
typedef struct SConsumerObj { typedef struct SConsumerObj {
uint64_t uid; uint64_t uid;
int64_t createTime; int64_t createTime;
...@@ -309,17 +310,59 @@ typedef struct SConsumerObj { ...@@ -309,17 +310,59 @@ typedef struct SConsumerObj {
//uint64_t dbUid; //uint64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
SList* topics; SArray* topics;
} SConsumerObj; } SConsumerObj;
typedef struct SMqTopicConsumer {
int64_t consumerId;
SList* topicList;
} SMqTopicConsumer;
#endif
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
SList *consumerIds; // SList<int64_t>
SList *idleVGroups; // SList<int32_t>
} SMqCGroup;
typedef struct SMqTopicObj {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
uint64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t sqlLen;
char *sql;
char *logicalPlan;
char *physicalPlan;
SHashObj *cgroups; // SHashObj<SMqCGroup>
} SMqTopicObj;
// TODO: add cache and change name to id
typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN];
SList *vgroups; // SList<int32_t>
} SMqConsumerTopic;
typedef struct SMqConsumerObj {
SRWLatch lock;
int64_t consumerId;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic>
} SMqConsumerObj;
typedef struct SMqSubConsumerObj { typedef struct SMqSubConsumerObj {
int64_t consumerUid; // if -1, unassigned int64_t consumerUid; // if -1, unassigned
SList* vgId; //SList<int32_t> SList *vgId; // SList<int32_t>
} SMqSubConsumerObj; } SMqSubConsumerObj;
typedef struct SMqSubCGroupObj { typedef struct SMqSubCGroupObj {
char name[TSDB_CONSUMER_GROUP_LEN]; char name[TSDB_CONSUMER_GROUP_LEN];
SList* consumers; //SList<SMqConsumerObj> SList *consumers; // SList<SMqConsumerObj>
} SMqSubCGroupObj; } SMqSubCGroupObj;
typedef struct SMqSubTopicObj { typedef struct SMqSubTopicObj {
...@@ -332,32 +375,33 @@ typedef struct SMqSubTopicObj { ...@@ -332,32 +375,33 @@ typedef struct SMqSubTopicObj {
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
int32_t sqlLen; int32_t sqlLen;
char* sql; char *sql;
char* logicalPlan; char *logicalPlan;
char* physicalPlan; char *physicalPlan;
SList* cgroups; //SList<SMqSubCGroupObj> SList *cgroups; // SList<SMqSubCGroupObj>
} SMqSubTopicObj; } SMqSubTopicObj;
typedef struct SMqConsumerSubObj { typedef struct SMqConsumerSubObj {
int64_t topicUid; int64_t topicUid;
SList* vgIds; //SList<int64_t> SList *vgIds; // SList<int64_t>
} SMqConsumerSubObj; } SMqConsumerSubObj;
typedef struct SMqConsumerHbObj { typedef struct SMqConsumerHbObj {
int64_t consumerId; int64_t consumerId;
SList* consumerSubs; //SList<SMqConsumerSubObj> SList *consumerSubs; // SList<SMqConsumerSubObj>
} SMqConsumerHbObj; } SMqConsumerHbObj;
typedef struct SMqVGroupSubObj { typedef struct SMqVGroupSubObj {
int64_t topicUid; int64_t topicUid;
SList* consumerIds; //SList<int64_t> SList *consumerIds; // SList<int64_t>
} SMqVGroupSubObj; } SMqVGroupSubObj;
typedef struct SMqVGroupHbObj { typedef struct SMqVGroupHbObj {
int64_t vgId; int64_t vgId;
SList* vgSubs; //SList<SMqVGroupSubObj> SList *vgSubs; // SList<SMqVGroupSubObj>
} SMqVGroupHbObj; } SMqVGroupHbObj;
#if 0
typedef struct SCGroupObj { typedef struct SCGroupObj {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
int64_t createTime; int64_t createTime;
...@@ -368,24 +412,7 @@ typedef struct SCGroupObj { ...@@ -368,24 +412,7 @@ typedef struct SCGroupObj {
SRWLatch lock; SRWLatch lock;
SList* consumerIds; SList* consumerIds;
} SCGroupObj; } SCGroupObj;
#endif
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
uint64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t execLen;
void* executor;
int32_t sqlLen;
char* sql;
char* logicalPlan;
char* physicalPlan;
SList* consumerIds;
} STopicObj;
typedef struct SMnodeMsg { typedef struct SMnodeMsg {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
......
...@@ -25,8 +25,11 @@ extern "C" { ...@@ -25,8 +25,11 @@ extern "C" {
int32_t mndInitTopic(SMnode *pMnode); int32_t mndInitTopic(SMnode *pMnode);
void mndCleanupTopic(SMnode *pMnode); void mndCleanupTopic(SMnode *pMnode);
STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName);
void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic); void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,16 +24,17 @@ ...@@ -24,16 +24,17 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h" #include "tname.h"
#define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer); static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pConsumer, SConsumerObj *pNewConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg);
...@@ -57,8 +58,8 @@ int32_t mndInitConsumer(SMnode *pMnode) { ...@@ -57,8 +58,8 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp); /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq); /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
...@@ -66,84 +67,40 @@ int32_t mndInitConsumer(SMnode *pMnode) { ...@@ -66,84 +67,40 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {} void mndCleanupConsumer(SMnode *pMnode) {}
static SSdbRaw *mndCGroupActionEncode(SCGroupObj *pCGroup) { static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; return 0;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN);
SDB_SET_INT64(pRaw, dataPos, pCGroup->createTime);
SDB_SET_INT64(pRaw, dataPos, pCGroup->updateTime);
SDB_SET_INT64(pRaw, dataPos, pCGroup->uid);
/*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/
SDB_SET_INT32(pRaw, dataPos, pCGroup->version);
int32_t sz = listNEles(pCGroup->consumerIds);
SDB_SET_INT32(pRaw, dataPos, sz);
SListIter iter;
tdListInitIter(pCGroup->consumerIds, &iter, TD_LIST_FORWARD);
SListNode *pn = NULL;
while ((pn = tdListNext(&iter)) != NULL) {
int64_t consumerId = *(int64_t *)pn->data;
SDB_SET_INT64(pRaw, dataPos, consumerId);
}
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
}
static SSdbRow *mndCGroupActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != MND_CONSUMER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode cgroup since %s", terrstr());
return NULL;
}
// TODO: maximum size is not known
int32_t size = sizeof(SCGroupObj) + 128 * sizeof(int64_t);
SSdbRow *pRow = sdbAllocRow(size);
SCGroupObj *pCGroup = sdbGetRowObj(pRow);
if (pCGroup == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN);
SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->createTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->updateTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->uid);
/*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/
SDB_GET_INT32(pRaw, pRow, dataPos, &pCGroup->version);
int32_t sz;
SDB_GET_INT32(pRaw, pRow, dataPos, &sz);
// TODO: free list when failing
tdListInit(pCGroup->consumerIds, sizeof(int64_t));
for (int i = 0; i < sz; i++) {
int64_t consumerId;
SDB_GET_INT64(pRaw, pRow, dataPos, &consumerId);
tdListAppend(pCGroup->consumerIds, &consumerId);
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE);
return pRow;
} }
static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime); SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId);
SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime); int32_t len = strlen(pConsumer->cgroup);
/*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ SDB_SET_INT32(pRaw, dataPos, len);
SDB_SET_INT32(pRaw, dataPos, pConsumer->version); SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len);
SDB_SET_INT32(pRaw, dataPos, topicNum);
for (int i = 0; i < topicNum; i++) {
int32_t len;
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i);
len = strlen(pConsumerTopic->name);
SDB_SET_INT32(pRaw, dataPos, len);
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len);
int vgSize;
if (pConsumerTopic->vgroups == NULL) {
vgSize = 0;
} else {
vgSize = listNEles(pConsumerTopic->vgroups);
}
SDB_SET_INT32(pRaw, dataPos, vgSize);
for (int j = 0; j < vgSize; j++) {
// SList* head;
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos); SDB_SET_DATALEN(pRaw, dataPos);
...@@ -161,56 +118,67 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { ...@@ -161,56 +118,67 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
return NULL; return NULL;
} }
int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); int32_t size = sizeof(SMqConsumerObj);
SSdbRow *pRow = sdbAllocRow(size); SSdbRow *pRow = sdbAllocRow(size);
SConsumerObj *pConsumer = sdbGetRowObj(pRow); SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) return NULL; if (pConsumer == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->consumerId);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime); int32_t len, topicNum;
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime); SDB_GET_INT32(pRaw, pRow, dataPos, &len);
/*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->cgroup, len);
SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version); SDB_GET_INT32(pRaw, pRow, dataPos, &topicNum);
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
if (pConsumerTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
// TODO
return NULL;
}
/*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/
SDB_GET_INT32(pRaw, pRow, dataPos, &topicLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumerTopic->name, topicLen);
int32_t vgSize;
SDB_GET_INT32(pRaw, pRow, dataPos, &vgSize);
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE);
return pRow; return pRow;
} }
static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%ld, perform insert action", pConsumer->uid); mTrace("consumer:%ld, perform insert action", pConsumer->consumerId);
return 0; return 0;
} }
static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%ld, perform delete action", pConsumer->uid); mTrace("consumer:%ld, perform delete action", pConsumer->consumerId);
return 0; return 0;
} }
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%ld, perform update action", pOldConsumer->uid); mTrace("consumer:%ld, perform update action", pOldConsumer->consumerId);
atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime);
atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version);
taosWLockLatch(&pOldConsumer->lock);
// TODO handle update // TODO handle update
/*taosWLockLatch(&pOldConsumer->lock);*/
/*taosWUnLockLatch(&pOldConsumer->lock);*/
taosWUnLockLatch(&pOldConsumer->lock);
return 0; return 0;
} }
SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
} }
return pConsumer; return pConsumer;
} }
void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer) { void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
} }
...@@ -220,48 +188,185 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -220,48 +188,185 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq *pSubscribe; SCMSubscribeReq *pSubscribe;
tDeserializeSCMSubscribeReq(msgStr, pSubscribe); tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
int topicNum = pSubscribe->topicNum;
int64_t consumerId = pSubscribe->consumerId; int64_t consumerId = pSubscribe->consumerId;
char *consumerGroup = pSubscribe->consumerGroup; char *consumerGroup = pSubscribe->consumerGroup;
// get consumer group and add client into it
SCGroupObj *pCGroup = sdbAcquire(pMnode->pSdb, SDB_CGROUP, consumerGroup);
if (pCGroup != NULL) {
// iterate the list until finding the consumer
// add consumer to cgroup list if not found
// put new record
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SArray *newSub = NULL;
if (pConsumer != NULL) { int newTopicNum = pSubscribe->topicNum;
//reset topic list if (newTopicNum) {
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
} }
for (int i = 0; i < newTopicNum; i++) {
for (int i = 0; i < topicNum; i++) { char *topic = pSubscribe->topicName[i];
char *topicName = pSubscribe->topicName[i]; SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
STopicObj *pTopic = mndAcquireTopic(pMnode, topicName); if (pConsumerTopic == NULL) {
//get terrno = TSDB_CODE_OUT_OF_MEMORY;
// consumer id // TODO: free
SList *list = pTopic->consumerIds; return -1;
// add the consumer if not in the list }
// pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
SList* topicList = pConsumer->topics; taosArrayPush(newSub, pConsumerTopic);
//add to topic free(pConsumerTopic);
} }
taosArraySortString(newSub, taosArrayCompareString);
return 0; SArray *oldSub = NULL;
} int oldTopicNum = 0;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = malloc(sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pConsumer->cgroup, pSubscribe->consumerGroup);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg) { return 0; } } else {
oldSub = pConsumer->topics;
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
return -1;
}
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg) { return 0; } int i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
SMqConsumerTopic *pOldTopic = NULL;
SMqConsumerTopic *pNewTopic = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
pOldTopic = taosArrayGet(oldSub, j);
j++;
} else if (j >= oldTopicNum) {
pNewTopic = taosArrayGet(newSub, i);
} else {
pNewTopic = taosArrayGet(newSub, i);
pOldTopic = taosArrayGet(oldSub, j);
char *newName = pNewTopic->name;
char *oldName = pOldTopic->name;
int comp = compareLenPrefixedStr(newName, oldName);
if (comp == 0) {
// do nothing
pOldTopic = pNewTopic = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
pOldTopic = NULL;
i++;
} else {
pNewTopic = NULL;
j++;
}
}
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } if (pOldTopic != NULL) {
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
SListIter iter;
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
SListNode *pn;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
ASSERT(pTopic != NULL);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO release
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// acquire and get epset
void *pMqVgSetReq =
mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, pSubscribe->consumerId, pSubscribe->consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
}
}
taosHashRemove(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
} else if (pNewTopic != NULL) {
ASSERT(pOldTopic == NULL);
char *newTopicName = pNewTopic->name;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
ASSERT(pTopic != NULL);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup), pGroup,
sizeof(SMqCGroup));
}
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &pSubscribe->consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
} else {
ASSERT(0);
}
}
// destroy old sub
taosArrayDestroy(oldSub);
// put new sub into consumerobj
pConsumer->topics = newSub;
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg) { // TODO: free memory
mndTransProcessRsp(pMsg); mndTransDrop(pTrans);
return 0; return 0;
} }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
...@@ -339,7 +444,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO ...@@ -339,7 +444,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO
int32_t numOfConsumers = 0; int32_t numOfConsumers = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break; if (pIter == NULL) break;
...@@ -402,49 +507,6 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMs ...@@ -402,49 +507,6 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMs
return 0; return 0;
} }
static int32_t mndRetrieveCGroup(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SCGroupObj *pCGroup = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pCGroup);
if (pShow->pIter == NULL) break;
if (strncmp(pCGroup->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pCGroup);
continue;
}
cols = 0;
char consumerName[TSDB_TABLE_NAME_LEN] = {0};
tstrncpy(consumerName, pCGroup->name + prefixLen, TSDB_TABLE_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, consumerName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pCGroup->createTime;
cols++;
numOfRows++;
sdbRelease(pSdb, pCGroup);
}
pShow->numOfReads += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTopic.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
...@@ -27,18 +28,16 @@ ...@@ -27,18 +28,16 @@
#define MND_TOPIC_VER_NUMBER 1 #define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_RESERVE_SIZE 64 #define MND_TOPIC_RESERVE_SIZE 64
static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic); static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic); static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg);
static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pTopic, STopicObj *pNewTopic); static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg);
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TOPIC, SSdbTable table = {.sdbType = SDB_TOPIC,
...@@ -58,8 +57,9 @@ int32_t mndInitTopic(SMnode *pMnode) { ...@@ -58,8 +57,9 @@ int32_t mndInitTopic(SMnode *pMnode) {
void mndCleanupTopic(SMnode *pMnode) {} void mndCleanupTopic(SMnode *pMnode) {}
static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE; int32_t len;
int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
...@@ -71,10 +71,15 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { ...@@ -71,10 +71,15 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) {
SDB_SET_INT64(pRaw, dataPos, pTopic->uid); SDB_SET_INT64(pRaw, dataPos, pTopic->uid);
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid); SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid);
SDB_SET_INT32(pRaw, dataPos, pTopic->version); SDB_SET_INT32(pRaw, dataPos, pTopic->version);
SDB_SET_INT32(pRaw, dataPos, pTopic->execLen);
SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen);
len = strlen(pTopic->logicalPlan);
SDB_SET_INT32(pRaw, dataPos, len);
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len);
len = strlen(pTopic->physicalPlan);
SDB_SET_INT32(pRaw, dataPos, len);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos); SDB_SET_DATALEN(pRaw, dataPos);
...@@ -82,7 +87,7 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { ...@@ -82,7 +87,7 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) {
return pRaw; return pRaw;
} }
static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
...@@ -92,11 +97,12 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -92,11 +97,12 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
return NULL; return NULL;
} }
int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); int32_t size = sizeof(SMqTopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size); SSdbRow *pRow = sdbAllocRow(size);
STopicObj *pTopic = sdbGetRowObj(pRow); SMqTopicObj *pTopic = sdbGetRowObj(pRow);
if (pTopic == NULL) return NULL; if (pTopic == NULL) return NULL;
int32_t len;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN); SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_DB_FNAME_LEN); SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_DB_FNAME_LEN);
...@@ -105,49 +111,51 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -105,49 +111,51 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid); SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid);
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->dbUid); SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->dbUid);
SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->version); SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->version);
SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->execLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->executor, pTopic->execLen);
SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen); SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen);
SDB_GET_INT32(pRaw, pRow, dataPos, &len);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->logicalPlan, len);
SDB_GET_INT32(pRaw, pRow, dataPos, &len);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->physicalPlan, len);
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TOPIC_RESERVE_SIZE); SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TOPIC_RESERVE_SIZE);
return pRow; return pRow;
} }
static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic) { static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform insert action", pTopic->name); mTrace("topic:%s, perform insert action", pTopic->name);
return 0; return 0;
} }
static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic) { static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform delete action", pTopic->name); mTrace("topic:%s, perform delete action", pTopic->name);
return 0; return 0;
} }
static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj *pNewTopic) { static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
mTrace("topic:%s, perform update action", pOldTopic->name); mTrace("topic:%s, perform update action", pOldTopic->name);
atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version); atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
taosWLockLatch(&pOldTopic->lock); taosWLockLatch(&pOldTopic->lock);
//TODO handle update // TODO handle update
taosWUnLockLatch(&pOldTopic->lock); taosWUnLockLatch(&pOldTopic->lock);
return 0; return 0;
} }
STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
STopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
if (pTopic == NULL) { if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
} }
return pTopic; return pTopic;
} }
void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic) { void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
...@@ -162,7 +170,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { ...@@ -162,7 +170,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicMsg); int32_t contLen = sizeof(SDDropTopicMsg);
SDDropTopicMsg *pDrop = calloc(1, contLen); SDDropTopicMsg *pDrop = calloc(1, contLen);
...@@ -185,7 +193,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { ...@@ -185,7 +193,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) {
} }
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
STopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
topicObj.createTime = taosGetTimestampMs(); topicObj.createTime = taosGetTimestampMs();
...@@ -197,13 +205,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq ...@@ -197,13 +205,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1; if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
// TODO: replace with trans to support recovery
return sdbWrite(pMnode->pSdb, pTopicRaw); return sdbWrite(pMnode->pSdb, pTopicRaw);
} }
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pMsg->rpcMsg.pCont;
SCMCreateTopicReq* pCreate; SCMCreateTopicReq *pCreate;
tDeserializeSCMCreateTopicReq(msgStr, pCreate); tDeserializeSCMCreateTopicReq(msgStr, pCreate);
mDebug("topic:%s, start to create", pCreate->name); mDebug("topic:%s, start to create", pCreate->name);
...@@ -213,7 +222,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { ...@@ -213,7 +222,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
STopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name);
if (pTopic != NULL) { if (pTopic != NULL) {
sdbRelease(pMnode->pSdb, pTopic); sdbRelease(pMnode->pSdb, pTopic);
if (pCreate->igExists) { if (pCreate->igExists) {
...@@ -245,9 +254,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { ...@@ -245,9 +254,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; }
return 0;
}
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
...@@ -255,7 +262,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { ...@@ -255,7 +262,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
mDebug("topic:%s, start to drop", pDrop->name); mDebug("topic:%s, start to drop", pDrop->name);
STopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name);
if (pTopic == NULL) { if (pTopic == NULL) {
if (pDrop->igNotExists) { if (pDrop->igNotExists) {
mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name);
...@@ -361,7 +368,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo ...@@ -361,7 +368,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
int32_t numOfTopics = 0; int32_t numOfTopics = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
STopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break; if (pIter == NULL) break;
...@@ -440,13 +447,13 @@ static void mndExtractTableName(char *tableId, char *name) { ...@@ -440,13 +447,13 @@ static void mndExtractTableName(char *tableId, char *name) {
} }
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
STopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char *pWrite;
char prefix[64] = {0}; char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64); tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
......
...@@ -261,15 +261,18 @@ int walLoadMeta(SWal* pWal) { ...@@ -261,15 +261,18 @@ int walLoadMeta(SWal* pWal) {
memset(buf, 0, size + 5); memset(buf, 0, size + 5);
int tfd = tfOpenRead(fnameStr); int tfd = tfOpenRead(fnameStr);
if (tfRead(tfd, buf, size) != size) { if (tfRead(tfd, buf, size) != size) {
tfClose(tfd);
free(buf); free(buf);
return -1; return -1;
} }
// load into fileInfoSet // load into fileInfoSet
int code = walMetaDeserialize(pWal, buf); int code = walMetaDeserialize(pWal, buf);
if (code != 0) { if (code != 0) {
tfClose(tfd);
free(buf); free(buf);
return -1; return -1;
} }
tfClose(tfd);
free(buf); free(buf);
return 0; return 0;
} }
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "tfile.h" #include "tfile.h"
#include "walInt.h" #include "walInt.h"
#include "taoserror.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
...@@ -30,6 +31,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) { ...@@ -30,6 +31,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
pRead->status = 0; pRead->status = 0;
pRead->pHead = malloc(sizeof(SWalHead)); pRead->pHead = malloc(sizeof(SWalHead));
if (pRead->pHead == NULL) { if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
free(pRead); free(pRead);
return NULL; return NULL;
} }
...@@ -55,16 +57,19 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i ...@@ -55,16 +57,19 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
code = tfLseek(idxTfd, offset, SEEK_SET); code = tfLseek(idxTfd, offset, SEEK_SET);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
SWalIdxEntry entry; SWalIdxEntry entry;
if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
// TODO:deserialize // TODO:deserialize
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET); code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
return code; return code;
...@@ -79,6 +84,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -79,6 +84,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int64_t logTfd = tfOpenRead(fnameStr); int64_t logTfd = tfOpenRead(fnameStr);
if (logTfd < 0) { if (logTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -100,6 +106,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { ...@@ -100,6 +106,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
...@@ -113,7 +120,6 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { ...@@ -113,7 +120,6 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
if (pRead->curFileFirstVer != pRet->firstVer) { if (pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer); code = walReadChangeFile(pRead, pRet->firstVer);
if (code < 0) { if (code < 0) {
// TODO: set error flag
return -1; return -1;
} }
} }
...@@ -132,7 +138,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -132,7 +138,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
// TODO: check wal life // TODO: check wal life
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReadSeekVer(pRead, ver);
if (code != 0) { if (code < 0) {
return -1; return -1;
} }
} }
...@@ -145,11 +151,13 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -145,11 +151,13 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
code = walValidHeadCksum(pRead->pHead); code = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (pRead->capacity < pRead->pHead->head.len) { if (pRead->capacity < pRead->pHead->head.len) {
void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pRead->pHead = ptr; pRead->pHead = ptr;
...@@ -163,6 +171,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -163,6 +171,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
code = walValidBodyCksum(pRead->pHead); code = walValidBodyCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
pRead->curVersion++; pRead->curVersion++;
......
...@@ -30,17 +30,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { ...@@ -30,17 +30,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t idxOff = walGetVerIdxOffset(pWal, ver); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = tfLseek(idxTfd, idxOff, SEEK_SET); code = tfLseek(idxTfd, idxOff, SEEK_SET);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
SWalIdxEntry entry; SWalIdxEntry entry;
// TODO:deserialize // TODO:deserialize
code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)); code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry));
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_CUR); code = tfLseek(logTfd, entry.offset, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
return code; return code;
...@@ -56,11 +59,13 @@ int walChangeFileToLast(SWal* pWal) { ...@@ -56,11 +59,13 @@ int walChangeFileToLast(SWal* pWal) {
walBuildIdxName(pWal, fileFirstVer, fnameStr); walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr); idxTfd = tfOpenReadWrite(fnameStr);
if (idxTfd < 0) { if (idxTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
walBuildLogName(pWal, fileFirstVer, fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr); logTfd = tfOpenReadWrite(fnameStr);
if (logTfd < 0) { if (logTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
// switch file // switch file
...@@ -76,11 +81,12 @@ int walChangeFile(SWal* pWal, int64_t ver) { ...@@ -76,11 +81,12 @@ int walChangeFile(SWal* pWal, int64_t ver) {
code = tfClose(pWal->writeLogTfd); code = tfClose(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
// TODO // TODO
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
code = tfClose(pWal->writeIdxTfd); code = tfClose(pWal->writeIdxTfd);
if (code != 0) { if (code != 0) {
// TODO terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
...@@ -113,6 +119,7 @@ int walSeekVer(SWal* pWal, int64_t ver) { ...@@ -113,6 +119,7 @@ int walSeekVer(SWal* pWal, int64_t ver) {
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
......
...@@ -25,6 +25,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) { ...@@ -25,6 +25,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) { if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
pWal->vers.commitVer = ver; pWal->vers.commitVer = ver;
...@@ -38,6 +39,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -38,6 +39,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册