From 6beee204d825ef8a5fc0a2412f1064a1bb9ae615 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 21 Dec 2021 20:07:32 +0800 Subject: [PATCH] add tq query --- include/dnode/vnode/tq/tq.h | 29 +++-- include/dnode/vnode/vnode.h | 10 ++ include/libs/wal/wal.h | 5 +- include/util/ttimer.h | 2 + source/dnode/vnode/tq/CMakeLists.txt | 1 + source/dnode/vnode/tq/inc/tqInt.h | 1 + source/dnode/vnode/tq/src/tq.c | 182 ++++++++++++++++++++++----- source/libs/wal/src/walRead.c | 8 +- 8 files changed, 197 insertions(+), 41 deletions(-) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 60a8c252c0..5eeaaa1011 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -22,6 +22,7 @@ #include "taoserror.h" #include "taosmsg.h" #include "tlist.h" +#include "trpc.h" #include "tutil.h" #ifdef __cplusplus @@ -54,6 +55,7 @@ typedef struct STqSetCurReq { typedef struct STqConsumeReq { STqMsgHead head; + int64_t blockingTime; // milisec STqAcks acks; } STqConsumeReq; @@ -107,6 +109,17 @@ typedef struct STqExec { struct STqExec* (*deserialize)(char*); } STqExec; +typedef struct STqRspHandle { + void* handle; + void* ahandle; +} STqRspHandle; + +typedef enum { + TQ_ITEM_READY, + TQ_ITEM_PROCESS, + TQ_ITEM_EMPTY +} STqItemStatus; + typedef struct STqBufferItem { int64_t offset; // executors are identical but not concurrent @@ -135,13 +148,13 @@ typedef struct STqListHandle { } STqList; typedef struct STqGroup { - int64_t clientId; - int64_t cgId; - void* ahandle; - int32_t topicNum; - STqList* head; - SList* topicList; // SList - void* returnMsg; // SVReadMsg + int64_t clientId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + //STqList* head; + SList* topicList; // SList + STqRspHandle rspHandle; } STqGroup; typedef struct STqQueryMsg { @@ -264,7 +277,7 @@ void tqClose(STQ*); // void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqConsume(STQ*, STqConsumeReq*); +int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp); int tqSetCursor(STQ*, STqSetCurReq* pMsg); int tqBufferSetOffset(STqTopic*, int64_t offset); diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 8458ad9da3..a373828ebd 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -122,6 +122,16 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); */ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +/** + * @brief Process a consume message. + * + * @param pVnode The vnode object. + * @param pMsg The request message + * @param pRsp The response message + * @return int 0 for success, -1 for failure + */ +int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); + /** * @brief Process the sync request * diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 89f24cf3a4..67d2009d3b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -174,8 +174,11 @@ SWalReadHandle *walOpenReadHandle(SWal *); void walCloseReadHandle(SWalReadHandle *); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); +// deprecated +#if 0 int32_t walRead(SWal *, SWalHead **, int64_t ver); -// int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); +int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); +#endif // lifecycle check int64_t walGetFirstVer(SWal *); diff --git a/include/util/ttimer.h b/include/util/ttimer.h index 987d3f3cdc..89ec6cd8d9 100644 --- a/include/util/ttimer.h +++ b/include/util/ttimer.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_TIMER_H #define _TD_UTIL_TIMER_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/dnode/vnode/tq/CMakeLists.txt b/source/dnode/vnode/tq/CMakeLists.txt index 8d59c7b07a..7cb7499d64 100644 --- a/source/dnode/vnode/tq/CMakeLists.txt +++ b/source/dnode/vnode/tq/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries( PUBLIC os PUBLIC util PUBLIC common + PUBLIC transport ) if(${BUILD_TEST}) diff --git a/source/dnode/vnode/tq/inc/tqInt.h b/source/dnode/vnode/tq/inc/tqInt.h index 5685a29d03..107f5d5103 100644 --- a/source/dnode/vnode/tq/inc/tqInt.h +++ b/source/dnode/vnode/tq/inc/tqInt.h @@ -18,6 +18,7 @@ #include "tq.h" #include "tlog.h" +#include "trpc.h" #ifdef __cplusplus extern "C" { #endif diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index d8dfe4ddcf..972740183d 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -14,7 +14,9 @@ */ #include "tqInt.h" +#include "osSocket.h" #include "tqMetaStore.h" +#include "osAtomic.h" // static // read next version data @@ -51,16 +53,22 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA } pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { - // TODO: free STQ + free(pTq); + allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator); return NULL; } + return pTq; } + void tqClose(STQ* pTq) { // TODO } -static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; } +static int tqProtoCheck(STqMsgHead* pMsg) { + // TODO + return pMsg->protoVer == 0; +} static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) { // clean old item and move forward @@ -121,9 +129,15 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup // TODO return -1; } - *ppGroup = pGroup; memset(pGroup, 0, sizeof(STqGroup)); + pGroup->topicList = tdListNew(sizeof(STqTopic)); + if(pGroup->topicList == NULL) { + free(pGroup); + return -1; + } + *ppGroup = pGroup; + return 0; } @@ -152,46 +166,55 @@ int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; } -static int tqFetch(STqGroup* pGroup, void** msg) { - STqList* head = pGroup->head; - STqList* node = head; +static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) { + STqList* pHead = pGroup->head; + STqList* pNode = pHead; int totSize = 0; + int numOfMsgs = 0; // TODO: make it a macro - int sizeLimit = 4 * 1024; - STqMsgContent* buffer = malloc(sizeLimit); - if (buffer == NULL) { - // TODO:memory insufficient + int sizeLimit = 4 * 1024; + + void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit); + if (ptr == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } + *pRsp = ptr; + STqMsgContent* buffer = (*pRsp)->msgs; + // iterate the list to get msgs of all topics // until all topic iterated or msgs over sizeLimit - while (node->next) { - node = node->next; - STqTopic* topicHandle = &node->topic; - int idx = topicHandle->nextConsumeOffset % TQ_BUFFER_SIZE; - if (topicHandle->buffer[idx].content != NULL && topicHandle->buffer[idx].offset == topicHandle->nextConsumeOffset) { - totSize += topicHandle->buffer[idx].size; + while (pNode->next) { + pNode = pNode->next; + STqTopic* pTopic = &pNode->topic; + int idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE; + if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) { + totSize += pTopic->buffer[idx].size; if (totSize > sizeLimit) { - void* ptr = realloc(buffer, totSize); + void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize); if (ptr == NULL) { - totSize -= topicHandle->buffer[idx].size; - // TODO:memory insufficient + totSize -= pTopic->buffer[idx].size; + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; // return msgs already copied break; } + *pRsp = ptr; + break; } - *((int64_t*)buffer) = topicHandle->topicId; + *((int64_t*)buffer) = pTopic->topicId; buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - *((int64_t*)buffer) = topicHandle->buffer[idx].size; + *((int64_t*)buffer) = pTopic->buffer[idx].size; buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - memcpy(buffer, topicHandle->buffer[idx].content, topicHandle->buffer[idx].size); - buffer = POINTER_SHIFT(buffer, topicHandle->buffer[idx].size); + memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size); + numOfMsgs++; if (totSize > sizeLimit) { break; } } } - return totSize; + (*pRsp)->bodySize = totSize; + return numOfMsgs; } STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); } @@ -273,25 +296,126 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) { return 0; } -int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { +// temporary +int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) { int64_t clientId = pMsg->head.clientId; STqGroup* pGroup = tqGetGroup(pTq, clientId); if (pGroup == NULL) { terrno = TSDB_CODE_TQ_GROUP_NOT_SET; return -1; } + pGroup->rspHandle.handle = pRsp->handle; + pGroup->rspHandle.ahandle = pRsp->ahandle; + + return 0; +} + +int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { + STqConsumeReq *pMsg = pReq->pCont; + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); + if (pGroup == NULL) { + terrno = TSDB_CODE_TQ_GROUP_NOT_SET; + return -1; + } + + SList* topicList = pGroup->topicList; + + int totSize = 0; + int numOfMsgs = 0; + int sizeLimit = 4096; + + + STqConsumeRsp *pCsmRsp = (*pRsp)->pCont; + void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); + if (ptr == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return -1; + } + (*pRsp)->pCont = ptr; + + SListIter iter; + tdListInitIter(topicList, &iter, TD_LIST_FORWARD); + + STqMsgContent* buffer = NULL; + SArray* pArray = taosArrayInit(0, sizeof(void*)); + + SListNode *pn; + while((pn = tdListNext(&iter)) != NULL) { + STqTopic* pTopic = *(STqTopic**)pn->data; + int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; + STqMsgItem* pItem = &pTopic->buffer[idx]; + if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) { + if(pItem->status == TQ_ITEM_READY) { + //if has data + totSize += pTopic->buffer[idx].size; + if (totSize > sizeLimit) { + void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize); + if (ptr == NULL) { + totSize -= pTopic->buffer[idx].size; + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + // return msgs already copied + break; + } + (*pRsp)->pCont = ptr; + break; + } + *((int64_t*)buffer) = htonll(pTopic->topicId); + buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); + *((int64_t*)buffer) = htonll(pTopic->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); + memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size); + numOfMsgs++; + if (totSize > sizeLimit) { + break; + } + } else if(pItem->status == TQ_ITEM_PROCESS) { + //if not have data but in process + + } else if(pItem->status == TQ_ITEM_EMPTY){ + //if not have data and not in process + int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS); + if(old != TQ_ITEM_EMPTY) { + continue; + } + pItem->offset = pTopic->floatingCursor; + taosArrayPush(pArray, &pItem); + } else { + ASSERT(0); + } + + } + } + + for(int i = 0; i < pArray->size; i++) { + STqMsgItem* pItem = taosArrayGet(pArray, i); + + void* raw; + //read from wal + //get msgType + //if submitblk + pItem->executor->assign(pItem->executor->runtimeEnv, raw); + SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv); + pItem->content = content; + //if other type, send just put into buffer + pItem->content = raw; + + int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY); + ASSERT(old == TQ_ITEM_PROCESS); + + } - STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; - int numOfMsgs = tqFetch(pGroup, (void**)&pRsp->msgs); if (numOfMsgs < 0) { return -1; } + if (numOfMsgs == 0) { // most recent data has been fetched // enable timer for blocking wait - // once new data written during wait time - // launch query and response + // once new data written when waiting, launch query and rsp + return -1; } // fetched a num of msgs, rpc response diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 48eb84b536..c80fb4eed8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -170,6 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { return 0; } +#if 0 int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int code; code = walSeekVer(pWal, ver); @@ -207,6 +208,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { return 0; } -/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/ -/*return 0;*/ -/*}*/ +int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) { +return 0; +} +#endif -- GitLab