From 0e690344542ecd5c5669ce5b6d7b04107d3930dd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 17 Dec 2021 17:44:36 +0800 Subject: [PATCH] refactor tq --- include/common/taosmsg.h | 2 +- include/dnode/vnode/tq/tq.h | 171 +++++++------- source/dnode/mgmt/impl/src/dndTransport.c | 2 +- source/dnode/vnode/impl/src/vnodeWrite.c | 6 +- source/dnode/vnode/tq/src/tq.c | 270 +++++++++++++--------- source/dnode/vnode/tq/src/tqMetaStore.c | 2 +- source/dnode/vnode/tq/test/tqMetaTest.cpp | 125 ++++------ 7 files changed, 295 insertions(+), 283 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 1cc6c3b5a2..55f51c285c 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -50,7 +50,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" ) // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 7993a8f1ab..6082105700 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -18,84 +18,71 @@ #include "mallocator.h" #include "os.h" +#include "tlist.h" #include "tutil.h" #ifdef __cplusplus extern "C" { #endif -typedef struct TmqMsgHead { +typedef struct STqMsgHead { int32_t protoVer; int32_t msgType; int64_t cgId; int64_t clientId; -} TmqMsgHead; +} STqMsgHead; -typedef struct TmqOneAck { +typedef struct STqOneAck { int64_t topicId; int64_t consumeOffset; -} TmqOneAck; +} STqOneAck; -typedef struct TmqAcks { +typedef struct STqAcks { int32_t ackNum; // should be sorted - TmqOneAck acks[]; -} TmqAcks; + STqOneAck acks[]; +} STqAcks; -// TODO: put msgs into common -typedef struct TmqConnectReq { - TmqMsgHead head; - TmqAcks acks; -} TmqConnectReq; - -typedef struct TmqConnectRsp { - TmqMsgHead head; - int8_t status; -} TmqConnectRsp; - -typedef struct TmqDisconnectReq { - TmqMsgHead head; -} TmqDiscconectReq; - -typedef struct TmqDisconnectRsp { - TmqMsgHead head; - int8_t status; -} TmqDisconnectRsp; +typedef struct STqSetCurReq { + STqMsgHead head; + int64_t topicId; + int64_t offset; +} STqSetCurReq; typedef struct STqConsumeReq { - TmqMsgHead head; - TmqAcks acks; + STqMsgHead head; + STqAcks acks; } STqConsumeReq; -typedef struct TmqMsgContent { +typedef struct STqMsgContent { int64_t topicId; int64_t msgLen; char msg[]; -} TmqMsgContent; +} STqMsgContent; typedef struct STqConsumeRsp { - TmqMsgHead head; + STqMsgHead head; int64_t bodySize; - TmqMsgContent msgs[]; + STqMsgContent msgs[]; } STqConsumeRsp; -typedef struct TmqSubscribeReq { - TmqMsgHead head; +typedef struct STqSubscribeReq { + STqMsgHead head; int32_t topicNum; int64_t topic[]; -} TmqSubscribeReq; +} STqSubscribeReq; -typedef struct tmqSubscribeRsp { - TmqMsgHead head; +typedef struct STqSubscribeRsp { + STqMsgHead head; int64_t vgId; char ep[TSDB_EP_LEN]; // TSDB_EP_LEN -} TmqSubscribeRsp; +} STqSubscribeRsp; -typedef struct TmqHeartbeatReq { -} TmqHeartbeatReq; +typedef struct STqHeartbeatReq { +} STqHeartbeatReq; -typedef struct TmqHeartbeatRsp { -} TmqHeartbeatRsp; +typedef struct STqHeartbeatRsp { +} STqHeartbeatRsp; typedef struct STqTopicVhandle { int64_t topicId; @@ -113,39 +100,41 @@ typedef struct STqBufferItem { // executors are identical but not concurrent // so there must be a copy in each item void* executor; + int32_t status; int64_t size; void* content; -} STqBufferItem; +} STqMsgItem; -typedef struct STqBufferHandle { +typedef struct STqTopic { // char* topic; //c style, end with '\0' // int64_t cgId; // void* ahandle; - int64_t nextConsumeOffset; - int64_t floatingCursor; - int64_t topicId; - int32_t head; - int32_t tail; - STqBufferItem buffer[TQ_BUFFER_SIZE]; -} STqBufferHandle; + int64_t nextConsumeOffset; + int64_t floatingCursor; + int64_t topicId; + int32_t head; + int32_t tail; + STqMsgItem buffer[TQ_BUFFER_SIZE]; +} STqTopic; typedef struct STqListHandle { - STqBufferHandle bufHandle; + STqTopic topic; struct STqListHandle* next; -} STqListHandle; +} STqList; -typedef struct STqGroupHandle { - int64_t cId; - int64_t cgId; - void* ahandle; - int32_t topicNum; - STqListHandle* head; -} STqGroupHandle; +typedef struct STqGroup { + int64_t cId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + STqList* head; + SList* topicList; // SList +} STqGroup; typedef struct STqQueryExec { - void* src; - STqBufferItem* dest; - void* executor; + void* src; + STqMsgItem* dest; + void* executor; } STqQueryExec; typedef struct STqQueryMsg { @@ -209,15 +198,15 @@ typedef void (*FTqDelete)(void*); #define TQ_DUP_INTXN_REWRITE 0 #define TQ_DUP_INTXN_REJECT 2 -static inline bool TqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } +static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } -static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } +static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; #define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE -typedef struct TqMetaHandle { +typedef struct STqMetaHandle { int64_t key; int64_t offset; int64_t serializedSize; @@ -225,23 +214,25 @@ typedef struct TqMetaHandle { void* valueInTxn; } STqMetaHandle; -typedef struct TqMetaList { - STqMetaHandle handle; - struct TqMetaList* next; - // struct TqMetaList* inTxnPrev; - // struct TqMetaList* inTxnNext; - struct TqMetaList* unpersistPrev; - struct TqMetaList* unpersistNext; +typedef struct STqMetaList { + STqMetaHandle handle; + struct STqMetaList* next; + // struct STqMetaList* inTxnPrev; + // struct STqMetaList* inTxnNext; + struct STqMetaList* unpersistPrev; + struct STqMetaList* unpersistNext; } STqMetaList; -typedef struct TqMetaStore { +typedef struct STqMetaStore { STqMetaList* bucket[TQ_BUCKET_SIZE]; // a table head STqMetaList* unpersistHead; + // TODO:temporaral use, to be replaced by unified tfile int fileFd; // TODO:temporaral use, to be replaced by unified tfile - int idxFd; + int idxFd; + char* dirPath; int32_t tqConfigFlag; FTqSerialize pSerializer; @@ -250,8 +241,8 @@ typedef struct TqMetaStore { } STqMetaStore; typedef struct STQ { - // the collection of group handle - // the handle of kvstore + // the collection of groups + // the handle of meta kvstore char* path; STqCfg* tqConfig; STqLogReader* tqLogReader; @@ -266,23 +257,25 @@ void tqClose(STQ*); // void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqSetCursor(STQ*, void* msg); - int tqConsume(STQ*, STqConsumeReq*); -STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); +int tqSetCursor(STQ*, STqSetCurReq* pMsg); +int tqBufferSetOffset(STqTopic*, int64_t offset); + +STqTopic* tqFindTopic(STqGroup*, int64_t topicId); + +STqGroup* tqGetGroup(STQ*, int64_t clientId); + +STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqRegisterContext(STqGroup*, void* ahandle); +int tqSendLaunchQuery(STqMsgItem*, int64_t offset); -STqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqMoveOffsetToNext(STqGroupHandle*); -int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqRegisterContext(STqGroupHandle*, void* ahandle); -int tqLaunchQuery(STqGroupHandle*); -int tqSendLaunchQuery(STqGroupHandle*); +int tqSerializeGroup(const STqGroup*, STqSerializedHead**); -int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead); +const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); -const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** gHandle); +static int tqQueryExecuting(int32_t status) { return status; } #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 1db92644ae..8202bfcf2a 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -44,7 +44,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET_CUR] = dndProcessVnodeWriteMsg; // msg from client to mnode pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg; diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 85e044266a..fc977ce03f 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -16,17 +16,13 @@ #include "vnodeDef.h" int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { - SVnodeReq *pVnodeReq; - switch (pMsg->msgType) { - case TSDB_MSG_TYPE_MQ_SET: + case TSDB_MSG_TYPE_MQ_SET_CUR: if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) { // TODO: handle error } break; } - - void *pBuf = pMsg->pCont; return 0; } diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 2c07529219..88fa54cd8a 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -24,21 +24,16 @@ // handle management message // -int tqGetgHandleSSize(const STqGroupHandle* gHandle); -int tqBufHandleSSize(); -int tqBufItemSSize(); +int tqGroupSSize(const STqGroup* pGroup); +int tqTopicSSize(); +int tqItemSSize(); -STqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - STqGroupHandle* gHandle; - return NULL; -} - -void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr); -void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr); -void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr); +void* tqSerializeListHandle(STqList* listHandle, void* ptr); +void* tqSerializeTopic(STqTopic* pTopic, void* ptr); +void* tqSerializeItem(STqMsgItem* pItem, void* ptr); -const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle); -const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem); +const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); +const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); @@ -54,8 +49,7 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA if (pTq->tqMemRef.pAllocator == NULL) { // TODO } - pTq->tqMeta = - tqStoreOpen(path, (FTqSerialize)tqSerializeGroupHandle, (FTqDeserialize)tqDeserializeGroupHandle, free, 0); + pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { // TODO: free STQ return NULL; @@ -63,14 +57,14 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA return pTq; } -static int tqProtoCheck(TmqMsgHead* pMsg) { return pMsg->protoVer == 0; } +static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; } -static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) { +static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) { // clean old item and move forward int32_t consumeOffset = pAck->consumeOffset; int idx = consumeOffset % TQ_BUFFER_SIZE; - ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor); - tfree(bHandle->buffer[idx].content); + ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor); + tfree(pTopic->buffer[idx].content); if (1 /* TODO: need to launch new query */) { STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg)); if (pNewQuery == NULL) { @@ -78,29 +72,29 @@ static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg* return -1; } // TODO: lock executor - pNewQuery->exec->executor = bHandle->buffer[idx].executor; + pNewQuery->exec->executor = pTopic->buffer[idx].executor; // TODO: read from wal and assign to src pNewQuery->exec->src = 0; - pNewQuery->exec->dest = &bHandle->buffer[idx]; + pNewQuery->exec->dest = &pTopic->buffer[idx]; pNewQuery->next = *ppQuery; *ppQuery = pNewQuery; } return 0; } -static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) { +static int tqAck(STqGroup* pGroup, STqAcks* pAcks) { int32_t ackNum = pAcks->ackNum; - TmqOneAck* acks = pAcks->acks; + STqOneAck* acks = pAcks->acks; // double ptr for acks and list - int i = 0; - STqListHandle* node = gHandle->head; - int ackCnt = 0; - STqQueryMsg* pQuery = NULL; + int i = 0; + STqList* node = pGroup->head; + int ackCnt = 0; + STqQueryMsg* pQuery = NULL; while (i < ackNum && node->next) { - if (acks[i].topicId == node->next->bufHandle.topicId) { + if (acks[i].topicId == node->next->topic.topicId) { ackCnt++; - tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery); - } else if (acks[i].topicId < node->next->bufHandle.topicId) { + tqAckOneTopic(&node->next->topic, &acks[i], &pQuery); + } else if (acks[i].topicId < node->next->topic.topicId) { i++; } else { node = node->next; @@ -112,28 +106,29 @@ static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) { return ackCnt; } -static int tqCommitTCGroup(STqGroupHandle* handle) { +static int tqCommitGroup(STqGroup* pGroup) { // persist modification into disk return 0; } -int tqCreateTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroupHandle** handle) { +int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) { // create in disk - STqGroupHandle* gHandle = (STqGroupHandle*)malloc(sizeof(STqGroupHandle)); - if (gHandle == NULL) { + STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup)); + if (pGroup == NULL) { // TODO return -1; } - memset(gHandle, 0, sizeof(STqGroupHandle)); + *ppGroup = pGroup; + memset(pGroup, 0, sizeof(STqGroup)); return 0; } -STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - STqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId); - if (gHandle == NULL) { - int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle); - if (code != 0) { +STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId); + if (pGroup == NULL) { + int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup); + if (code < 0) { // TODO return NULL; } @@ -141,23 +136,26 @@ STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t c // create // open - return gHandle; + return pGroup; } -int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; } +int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + // TODO + return 0; +} -int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { +int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { // delete from disk return 0; } -static int tqFetch(STqGroupHandle* gHandle, void** msg) { - STqListHandle* head = gHandle->head; - STqListHandle* node = head; - int totSize = 0; +static int tqFetch(STqGroup* pGroup, void** msg) { + STqList* head = pGroup->head; + STqList* node = head; + int totSize = 0; // TODO: make it a macro int sizeLimit = 4 * 1024; - TmqMsgContent* buffer = malloc(sizeLimit); + STqMsgContent* buffer = malloc(sizeLimit); if (buffer == NULL) { // TODO:memory insufficient return -1; @@ -166,25 +164,25 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) { // until all topic iterated or msgs over sizeLimit while (node->next) { node = node->next; - STqBufferHandle* bufHandle = &node->bufHandle; - int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; - if (bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset) { - totSize += bufHandle->buffer[idx].size; + STqTopic* topicHandle = &node->topic; + int idx = topicHandle->nextConsumeOffset % TQ_BUFFER_SIZE; + if (topicHandle->buffer[idx].content != NULL && topicHandle->buffer[idx].offset == topicHandle->nextConsumeOffset) { + totSize += topicHandle->buffer[idx].size; if (totSize > sizeLimit) { void* ptr = realloc(buffer, totSize); if (ptr == NULL) { - totSize -= bufHandle->buffer[idx].size; + totSize -= topicHandle->buffer[idx].size; // TODO:memory insufficient // return msgs already copied break; } } - *((int64_t*)buffer) = bufHandle->topicId; + *((int64_t*)buffer) = topicHandle->topicId; buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - *((int64_t*)buffer) = bufHandle->buffer[idx].size; + *((int64_t*)buffer) = topicHandle->buffer[idx].size; buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size); - buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size); + memcpy(buffer, topicHandle->buffer[idx].content, topicHandle->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, topicHandle->buffer[idx].size); if (totSize > sizeLimit) { break; } @@ -193,11 +191,19 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) { return totSize; } -STqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; } +STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); } -int tqLaunchQuery(STqGroupHandle* gHandle) { return 0; } - -int tqSendLaunchQuery(STqGroupHandle* gHandle) { return 0; } +int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) { + if (tqQueryExecuting(bufItem->status)) { + return 0; + } + bufItem->status = 1; + // load data from wal or buffer pool + // put into exec + // send exec into non blocking queue + // when query finished, put into buffer pool + return 0; +} /*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/ /*return 0;*/ @@ -214,23 +220,69 @@ int tqCommit(STQ* pTq) { return 0; } -int tqSetCursor(STQ* pTq, void* msg) { +int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) { + int code; + memset(pTopic->buffer, 0, sizeof(pTopic->buffer)); + // launch query + for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) { + int pos = i % TQ_BUFFER_SIZE; + code = tqSendLaunchQuery(&pTopic->buffer[pos], offset); + if (code < 0) { + // TODO: error handling + } + } + // set offset + pTopic->nextConsumeOffset = offset; + pTopic->floatingCursor = offset; + return 0; +} + +STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) { + // TODO + return NULL; +} + +int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) { + int code; + int64_t clientId = pMsg->head.clientId; + int64_t topicId = pMsg->topicId; + int64_t offset = pMsg->offset; + STqGroup* gHandle = tqGetGroup(pTq, clientId); + if (gHandle == NULL) { + // client not connect + return -1; + } + STqTopic* topicHandle = tqFindTopic(gHandle, topicId); + if (topicHandle == NULL) { + return -1; + } + if (pMsg->offset == topicHandle->nextConsumeOffset) { + return 0; + } + // TODO: check log last version + + code = tqBufferSetOffset(topicHandle, offset); + if (code < 0) { + // set error code + return -1; + } + return 0; } int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { - if (!tqProtoCheck((TmqMsgHead*)pMsg)) { + if (!tqProtoCheck((STqMsgHead*)pMsg)) { // proto version invalid return -1; } - int64_t clientId = pMsg->head.clientId; - STqGroupHandle* gHandle = tqGetGroupHandle(pTq, clientId); - if (gHandle == NULL) { + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); + if (pGroup == NULL) { // client not connect return -1; } if (pMsg->acks.ackNum != 0) { - if (tqAck(gHandle, &pMsg->acks) != 0) { + if (tqAck(pGroup, &pMsg->acks) != 0) { // ack not success return -1; } @@ -238,22 +290,22 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; - if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) { + if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) { // fetch error return -1; } // judge and launch new query - if (tqLaunchQuery(gHandle)) { - // launch query error - return -1; - } + /*if (tqSendLaunchQuery(gHandle)) {*/ + // launch query error + /*return -1;*/ + /*}*/ return 0; } -int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead) { +int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { // calculate size - int sz = tqGetgHandleSSize(gHandle) + sizeof(STqSerializedHead); + int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead); if (sz > (*ppHead)->ssize) { void* tmpPtr = realloc(*ppHead, sz); if (tmpPtr == NULL) { @@ -266,52 +318,52 @@ int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** pp } void* ptr = (*ppHead)->content; // do serialization - *(int64_t*)ptr = gHandle->cId; + *(int64_t*)ptr = pGroup->cId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int64_t*)ptr = gHandle->cgId; + *(int64_t*)ptr = pGroup->cgId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int32_t*)ptr = gHandle->topicNum; + *(int32_t*)ptr = pGroup->topicNum; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - if (gHandle->topicNum > 0) { - tqSerializeListHandle(gHandle->head, ptr); + if (pGroup->topicNum > 0) { + tqSerializeListHandle(pGroup->head, ptr); } return 0; } -void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr) { - STqListHandle* node = listHandle; +void* tqSerializeListHandle(STqList* listHandle, void* ptr) { + STqList* node = listHandle; ASSERT(node != NULL); while (node) { - ptr = tqSerializeBufHandle(&node->bufHandle, ptr); + ptr = tqSerializeTopic(&node->topic, ptr); node = node->next; } return ptr; } -void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr) { - *(int64_t*)ptr = bufHandle->nextConsumeOffset; +void* tqSerializeTopic(STqTopic* pTopic, void* ptr) { + *(int64_t*)ptr = pTopic->nextConsumeOffset; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int64_t*)ptr = bufHandle->topicId; + *(int64_t*)ptr = pTopic->topicId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int32_t*)ptr = bufHandle->head; + *(int32_t*)ptr = pTopic->head; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - *(int32_t*)ptr = bufHandle->tail; + *(int32_t*)ptr = pTopic->tail; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr); + ptr = tqSerializeItem(&pTopic->buffer[i], ptr); } return ptr; } -void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr) { +void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) { // TODO: do we need serialize this? // mainly for executor return ptr; } -const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** ppGHandle) { - STqGroupHandle* gHandle = *ppGHandle; - const void* ptr = pHead->content; +const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) { + STqGroup* gHandle = *ppGroup; + const void* ptr = pHead->content; gHandle->cId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); gHandle->cgId = *(int64_t*)ptr; @@ -320,63 +372,63 @@ const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHan gHandle->topicNum = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); gHandle->head = NULL; - STqListHandle* node = gHandle->head; + STqList* node = gHandle->head; for (int i = 0; i < gHandle->topicNum; i++) { if (gHandle->head == NULL) { - if ((node = malloc(sizeof(STqListHandle))) == NULL) { + if ((node = malloc(sizeof(STqList))) == NULL) { // TODO: error return NULL; } node->next = NULL; - ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); + ptr = tqDeserializeTopic(ptr, &node->topic); gHandle->head = node; } else { - node->next = malloc(sizeof(STqListHandle)); + node->next = malloc(sizeof(STqList)); if (node->next == NULL) { // TODO: error return NULL; } node->next->next = NULL; - ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle); + ptr = tqDeserializeTopic(ptr, &node->next->topic); node = node->next; } } return ptr; } -const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle) { +const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) { const void* ptr = pBytes; - bufHandle->nextConsumeOffset = *(int64_t*)ptr; + topic->nextConsumeOffset = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - bufHandle->topicId = *(int64_t*)ptr; + topic->topicId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - bufHandle->head = *(int32_t*)ptr; + topic->head = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - bufHandle->tail = *(int32_t*)ptr; + topic->tail = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]); + ptr = tqDeserializeItem(ptr, &topic->buffer[i]); } return ptr; } -const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem) { return pBytes; } +const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; } // TODO: make this a macro -int tqGetgHandleSSize(const STqGroupHandle* gHandle) { +int tqGroupSSize(const STqGroup* gHandle) { return sizeof(int64_t) * 2 // cId + cgId + sizeof(int32_t) // topicNum - + gHandle->topicNum * tqBufHandleSSize(); + + gHandle->topicNum * tqTopicSSize(); } // TODO: make this a macro -int tqBufHandleSSize() { +int tqTopicSSize() { return sizeof(int64_t) * 2 // nextConsumeOffset + topicId + sizeof(int32_t) * 2 // head + tail - + TQ_BUFFER_SIZE * tqBufItemSSize(); + + TQ_BUFFER_SIZE * tqItemSSize(); } -int tqBufItemSSize() { +int tqItemSSize() { // TODO: do this need serialization? // mainly for executor return 0; diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 082f0ad28e..5a88e2176f 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -472,7 +472,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va if (pNode->handle.key == key) { // TODO: think about thread safety if (pNode->handle.valueInTxn) { - if (TqDupIntxnReject(pMeta->tqConfigFlag)) { + if (tqDupIntxnReject(pMeta->tqConfigFlag)) { return -2; } if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index 58263efa71..d3c9b50e4a 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -10,8 +10,8 @@ struct Foo { }; int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { - Foo* foo = (Foo*) pObj; - if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) { + Foo* foo = (Foo*)pObj; + if ((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) { *ppHead = (STqSerializedHead*)realloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t)); (*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t); } @@ -20,36 +20,28 @@ int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { } const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) { - if(*ppObj == NULL) { + if (*ppObj == NULL) { *ppObj = realloc(*ppObj, sizeof(int32_t)); } Foo* pFoo = *(Foo**)ppObj; - pFoo->a = *(int32_t*)pHead->content; + pFoo->a = *(int32_t*)pHead->content; return NULL; } -void FooDeleter(void* pObj) { - free(pObj); -} +void FooDeleter(void* pObj) { free(pObj); } class TqMetaUpdateAppendTest : public ::testing::Test { - protected: - - void SetUp() override { - taosRemoveDir(pathName); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); - ASSERT(pMeta); - } - - void TearDown() override { - tqStoreClose(pMeta); - } - - TqMetaStore* pMeta; - const char* pathName = "/tmp/tq_test"; + protected: + void SetUp() override { + taosRemoveDir(pathName); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); + ASSERT(pMeta); + } + + void TearDown() override { tqStoreClose(pMeta); } + + STqMetaStore* pMeta; + const char* pathName = "/tmp/tq_test"; }; TEST_F(TqMetaUpdateAppendTest, copyPutTest) { @@ -57,11 +49,11 @@ TEST_F(TqMetaUpdateAppendTest, copyPutTest) { foo.a = 3; tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); - Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1); + Foo* pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqHandleCommit(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo->a, 3); } @@ -78,10 +70,7 @@ TEST_F(TqMetaUpdateAppendTest, persistTest) { EXPECT_EQ(pBar == NULL, true); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); pBar = (Foo*)tqHandleGet(pMeta, 1); @@ -97,7 +86,7 @@ TEST_F(TqMetaUpdateAppendTest, uncommittedTest) { pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } @@ -106,11 +95,11 @@ TEST_F(TqMetaUpdateAppendTest, abortTest) { pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqHandleAbort(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } @@ -119,32 +108,29 @@ TEST_F(TqMetaUpdateAppendTest, deleteTest) { pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqHandleCommit(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); ASSERT_EQ(pFoo != NULL, true); EXPECT_EQ(pFoo->a, 3); tqHandleDel(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); ASSERT_EQ(pFoo != NULL, true); EXPECT_EQ(pFoo->a, 3); tqHandleCommit(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } @@ -162,10 +148,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { EXPECT_EQ(pFoo1->a, 3); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); pFoo1 = (Foo*)tqHandleGet(pMeta, 1); @@ -177,10 +160,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { EXPECT_EQ(pFoo1->a, 4); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); pFoo1 = (Foo*)tqHandleGet(pMeta, 1); @@ -190,13 +170,13 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { TEST_F(TqMetaUpdateAppendTest, multiplePage) { srand(0); std::vector v; - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { v.push_back(rand()); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 500; i++) { + for (int i = 0; i < 500; i++) { tqHandleCommit(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; @@ -204,38 +184,34 @@ TEST_F(TqMetaUpdateAppendTest, multiplePage) { } tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); - - for(int i = 500; i < 1000; i++) { + + for (int i = 500; i < 1000; i++) { tqHandleCommit(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - } TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { srand(0); std::vector v; - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { v.push_back(rand()); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 500; i++) { + for (int i = 0; i < 500; i++) { tqHandleCommit(pMeta, i); v[i] = rand(); Foo foo; @@ -243,25 +219,22 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 500; i < 1000; i++) { + for (int i = 500; i < 1000; i++) { v[i] = rand(); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { tqHandleCommit(pMeta, i); } tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); - - for(int i = 500; i < 1000; i++) { + + for (int i = 500; i < 1000; i++) { v[i] = rand(); Foo foo; foo.a = v[i]; @@ -269,40 +242,38 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { tqHandleCommit(pMeta, i); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - } TEST_F(TqMetaUpdateAppendTest, dupCommit) { srand(0); std::vector v; - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { v.push_back(rand()); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { int ret = tqHandleCommit(pMeta, i); EXPECT_EQ(ret, 0); ret = tqHandleCommit(pMeta, i); EXPECT_EQ(ret, -1); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { int ret = tqHandleCommit(pMeta, i); EXPECT_EQ(ret, -1); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - } -- GitLab