未验证 提交 945e2900 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #16177 from taosdata/feature/stream

refactor(tmq): use tdb to store check info
...@@ -2555,10 +2555,14 @@ typedef struct { ...@@ -2555,10 +2555,14 @@ typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
int64_t ntbUid; int64_t ntbUid;
SArray* colIdList; // SArray<int16_t> SArray* colIdList; // SArray<int16_t>
} SCheckAlterInfo; } STqCheckInfo;
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo); int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo); int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
} STqDelCheckInfoReq;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
......
...@@ -188,7 +188,8 @@ enum { ...@@ -188,7 +188,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
......
...@@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
......
...@@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp ...@@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
...@@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe ...@@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe
tEndDecode(decoder); tEndDecode(decoder);
return 0; return 0;
} }
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { ...@@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
return 0; return 0;
} }
#if 1
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
if (pVal->type == TMQ_OFFSET__RESET_NONE) { if (pVal->type == TMQ_OFFSET__RESET_NONE) {
snprintf(buf, maxLen, "offset(reset to none)"); snprintf(buf, maxLen, "offset(reset to none)");
...@@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { ...@@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} }
return 0; return 0;
} }
#endif
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
if (pLeft->type == pRight->type) { if (pLeft->type == pRight->type) {
...@@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { ...@@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0; return 0;
} }
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) { int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
int32_t sz = taosArrayGetSize(pInfo->colIdList); int32_t sz = taosArrayGetSize(pInfo->colIdList);
...@@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) ...@@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo)
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) { int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1; if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
int32_t sz; int32_t sz;
......
...@@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() { ...@@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() { ...@@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndOffset.h" #include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h" #include "mndTopic.h"
...@@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbRelease(pSdb, pOffset); sdbRelease(pSdb, pOffset);
} }
return code; return code;
} }
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
......
...@@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) { ...@@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
...@@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (topicObj.ntbUid != 0) { if (topicObj.ntbUid != 0) {
SCheckAlterInfo info; STqCheckInfo info;
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
info.ntbUid = topicObj.ntbUid; info.ntbUid = topicObj.ntbUid;
info.colIdList = topicObj.ntbColIds; info.colIdList = topicObj.ntbColIds;
...@@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
// encoder check alter info // encoder check alter info
int32_t len; int32_t len;
int32_t code; int32_t code;
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code); tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) { if (code < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) { if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf; action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len; action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_CHECK_ALTER_INFO; action.msgType = TDMT_VND_ADD_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 0
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0); ASSERT(0);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
return -1; return -1;
} }
#endif
// TODO check if rebalancing // TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
...@@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return -1; return -1;
} }
if (pTopic->ntbUid != 0) {
// broadcast to all vnode
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_DELETE_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
}
}
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
......
...@@ -117,16 +117,15 @@ typedef struct { ...@@ -117,16 +117,15 @@ typedef struct {
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
SHashObj* pushMgr; // consumerId -> STqHandle* SHashObj* pPushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
TDB* pMetaStore; TDB* pMetaDB;
TTB* pExecStore; TTB* pExecStore;
TTB* pCheckStore;
TTB* pAlterInfoStore;
SStreamMeta* pStreamMeta; SStreamMeta* pStreamMeta;
}; };
...@@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq); ...@@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaRestoreHandle(STQ* pTq);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
typedef struct { typedef struct {
int32_t size; int32_t size;
......
...@@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); ...@@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); // tq-mq
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver); int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); // tq-stream
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......
...@@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (tqMetaOpen(pTq) < 0) { if (tqMetaOpen(pTq) < 0) {
ASSERT(0); ASSERT(0);
...@@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
void tqClose(STQ* pTq) { void tqClose(STQ* pTq) {
if (pTq) { if (pTq) {
tqOffsetClose(pTq->pOffsetStore); tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles); taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pPushMgr);
taosHashCleanup(pTq->pAlterInfo); taosHashCleanup(pTq->pCheckInfo);
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta); streamMetaClose(pTq->pStreamMeta);
...@@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0; return 0;
} }
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) { static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
STqOffset offset = {0}; STqOffset offset = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
...@@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve ...@@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
} else if (offset.val.type == TMQ_OFFSET__LOG) { } else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version); TD_VID(pTq->pVnode), offset.val.version);
if (offset.val.version + 1 == version) {
offset.val.version += 1;
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/ STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
/*if (pOffset != NULL) {*/ if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/ return 0;
}
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0); ASSERT(0);
...@@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve ...@@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
} }
} }
// rsp
/*}*/ /*}*/
/*}*/ /*}*/
...@@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve ...@@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->pAlterInfo, pIter); pIter = taosHashIterate(pTq->pCheckInfo, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter; STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
if (pCheck->ntbUid == tbUid) { if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList); int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) { if (forbidColId == colId) {
taosHashCancelIterate(pTq->pAlterInfo, pIter); taosHashCancelIterate(pTq->pCheckInfo, pIter);
return -1; return -1;
} }
} }
...@@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
...@@ -478,10 +490,10 @@ OVER: ...@@ -478,10 +490,10 @@ OVER:
return code; return code;
} }
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0); ASSERT(code == 0);
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
...@@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SCheckAlterInfo info = {0}; STqCheckInfo info = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) { if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) { if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
return 0; return 0;
} }
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
...@@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
} }
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO // TODO
...@@ -668,34 +696,9 @@ FAIL: ...@@ -668,34 +696,9 @@ FAIL:
return code; return code;
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
// //
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
#if 0
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
tDecoderClear(&decoder);
if (tqExpandTask(pTq, pTask) < 0) {
goto FAIL;
}
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
return -1;
#endif
} }
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
...@@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
......
...@@ -43,86 +43,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { ...@@ -43,86 +43,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0; return 0;
} }
int32_t tqMetaRestoreHandle(STQ* pTq) { int32_t tqMetaOpen(STQ* pTq) {
TBC* pCur = NULL; if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
void* pKey = NULL; if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
int kLen = 0; ASSERT(0);
void* pVal = NULL; return -1;
int vLen = 0; }
SDecoder decoder;
tdbTbcMoveToFirst(pCur); if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
ASSERT(0);
return -1;
}
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { if (tqMetaRestoreHandle(pTq) < 0) {
STqHandle handle; return -1;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); }
tDecodeSTqHandle(&decoder, &handle);
handle.pRef = walOpenRef(pTq->pVnode->pWal); if (tqMetaRestoreCheckInfo(pTq) < 0) {
if (handle.pRef == NULL) { return -1;
ASSERT(0); }
return -1;
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return 0;
SReadHandle reader = { }
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( int32_t tqMetaClose(STQ* pTq) {
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); if (pTq->pExecStore) {
ASSERT(handle.execHandle.execCol.task); tdbTbClose(pTq->pExecStore);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
} }
if (pTq->pCheckStore) {
tdbTbcClose(pCur); tdbTbClose(pTq->pCheckStore);
}
tdbClose(pTq->pMetaDB);
return 0; return 0;
} }
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { TXN txn;
ASSERT(0); if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
return -1; return -1;
} }
if (tqMetaRestoreHandle(pTq) < 0) { if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
return -1;
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
return -1; return -1;
} }
return 0; return 0;
} }
int32_t tqMetaClose(STQ* pTq) { int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
if (pTq->pExecStore) { TXN txn;
tdbTbClose(pTq->pExecStore);
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
/*ASSERT(0);*/
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
return 0;
}
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tDecoderClear(&decoder);
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} }
tdbClose(pTq->pMetaStore); tdbTbcClose(pCur);
return 0; return 0;
} }
...@@ -153,7 +183,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -153,7 +183,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0); ASSERT(0);
} }
if (tdbBegin(pTq->pMetaStore, &txn) < 0) { if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -161,7 +191,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -161,7 +191,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0); ASSERT(0);
} }
if (tdbCommit(pTq->pMetaStore, &txn) < 0) { if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -177,7 +207,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ...@@ -177,7 +207,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
ASSERT(0); ASSERT(0);
} }
if (tdbBegin(pTq->pMetaStore, &txn) < 0) { if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -185,9 +215,67 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ...@@ -185,9 +215,67 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
if (tdbCommit(pTq->pMetaStore, &txn) < 0) { if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
return 0; return 0;
} }
int32_t tqMetaRestoreHandle(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
return -1;
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}
tdbTbcClose(pCur);
return 0;
}
...@@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { ...@@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->handles, pIter); pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......
...@@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
if (rollback) { if (rollback) {
ASSERT(0); tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn);
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
} }
......
...@@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break; break;
/* TQ */ /* TQ */
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_VND_MQ_VG_DELETE: case TDMT_VND_MQ_VG_DELETE:
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_VND_MQ_COMMIT_OFFSET: case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead), version) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_VND_CHECK_ALTER_INFO: case TDMT_VND_ADD_CHECK_INFO:
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_DELETE_CHECK_INFO:
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_DROP: { case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
} break; } break;
......
...@@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册