diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index b8bfd72efa41f00a6b97275f9353b3321dd85470..60a8c252c0b511e145e611814827626a48ed86bd 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -16,8 +16,11 @@ #ifndef _TD_TQ_H_ #define _TD_TQ_H_ +#include "common.h" #include "mallocator.h" #include "os.h" +#include "taoserror.h" +#include "taosmsg.h" #include "tlist.h" #include "tutil.h" @@ -97,10 +100,9 @@ typedef struct STqTopicVhandle { typedef struct STqExec { void* runtimeEnv; - // return type will be SSDataBlock - void* (*exec)(void* runtimeEnv); - // inputData type will be submitblk - void* (*assign)(void* runtimeEnv, void* inputData); + SSDataBlock* (*exec)(void* runtimeEnv); + void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData); + void (*clear)(void* runtimeEnv); char* (*serialize)(struct STqExec*); struct STqExec* (*deserialize)(char*); } STqExec; @@ -133,22 +135,17 @@ typedef struct STqListHandle { } STqList; typedef struct STqGroup { - int64_t cId; + int64_t clientId; int64_t cgId; void* ahandle; int32_t topicNum; STqList* head; SList* topicList; // SList + void* returnMsg; // SVReadMsg } STqGroup; -typedef struct STqQueryExec { - void* src; - STqMsgItem* dest; - void* executor; -} STqQueryExec; - typedef struct STqQueryMsg { - STqQueryExec* exec; + STqMsgItem* item; struct STqQueryMsg* next; } STqQueryMsg; @@ -258,12 +255,10 @@ typedef struct STQ { STqLogReader* tqLogReader; STqMemRef tqMemRef; STqMetaStore* tqMeta; - STqExec* tqExec; } STQ; // open in each vnode -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac, - STqExec* tqExec); +STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // void* will be replace by a msg type diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4f1ef7da7b6fa0a93fdb2d3cb20bf9f45c57ae91..f15e4c1c975996926b8995f767c3ca45b351b2ba 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -353,6 +353,20 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) //"Invalid msg length") #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) //"Invalid msg type") +// tq +#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00) //"Invalid configuration") +#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01) //"Tq init failed") +#define TSDB_CODE_TQ_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0A02) //"No diskspace for tq") +#define TSDB_CODE_TQ_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0A03) //"No permission for disk files") +#define TSDB_CODE_TQ_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0A04) //"Data file(s) corrupted") +#define TSDB_CODE_TQ_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0A05) //"Out of memory") +#define TSDB_CODE_TQ_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0A06) //"File already exists") +#define TSDB_CODE_TQ_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0A07) //"Failed to create dir") +#define TSDB_CODE_TQ_META_NO_SUCH_KEY TAOS_DEF_ERROR_CODE(0, 0x0A08) //"Target key not found") +#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) //"Target key not in transaction") +#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) //"Target key duplicated in transaction") +#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) //"Group of corresponding client is not set by mnode") + // wal #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") diff --git a/include/util/tlog.h b/include/util/tlog.h index 5e6604598dd22ad7755e4af51921fa3ee80a6abe..5c91398cdcf952046406d55e15911109a4c66435 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -42,11 +42,11 @@ extern int32_t qDebugFlag; extern int32_t wDebugFlag; extern int32_t sDebugFlag; extern int32_t tsdbDebugFlag; +extern int32_t tqDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; extern int32_t ctgDebugFlag; - #define DEBUG_FATAL 1U #define DEBUG_ERROR DEBUG_FATAL #define DEBUG_WARN 2U diff --git a/source/dnode/vnode/tq/CMakeLists.txt b/source/dnode/vnode/tq/CMakeLists.txt index 536e97d5f79b58442b2d0a6af5758bb4ec854fc1..8d59c7b07a9c3935187ba6b644d7f95f88179776 100644 --- a/source/dnode/vnode/tq/CMakeLists.txt +++ b/source/dnode/vnode/tq/CMakeLists.txt @@ -11,6 +11,7 @@ target_link_libraries( PUBLIC wal PUBLIC os PUBLIC util + PUBLIC common ) if(${BUILD_TEST}) diff --git a/source/dnode/vnode/tq/inc/tqInt.h b/source/dnode/vnode/tq/inc/tqInt.h index 022b5998162b907382643d1e01a8cf0b05596696..5685a29d03e7dc15a974d9728d0c6ec07d4cceb3 100644 --- a/source/dnode/vnode/tq/inc/tqInt.h +++ b/source/dnode/vnode/tq/inc/tqInt.h @@ -17,11 +17,20 @@ #define _TD_TQ_INT_H_ #include "tq.h" - +#include "tlog.h" #ifdef __cplusplus extern "C" { #endif +extern int32_t tqDebugFlag; + +#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }} +#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }} +#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }} +#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }} +#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }} +#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }} + // create persistent storage for meta info such as consuming offset // return value > 0: cgId // return value <= 0: error code diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 5bcedaed748d7d0668181183d09de7470ff630e2..ef71d8bf145ae535edf7740e864661e576078814 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -17,7 +17,7 @@ #define _TQ_META_STORE_H_ #include "os.h" -#include "tq.h" +#include "tqInt.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 7beab8a983267b4fb61493cc4a0f5ec90657da79..5c8578fa67608cd813b2208e6e30b81d28b81489 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -35,11 +35,10 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr); const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac, - STqExec* tqExec) { +STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } pTq->path = strdup(path); @@ -48,14 +47,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA pTq->tqMemRef.pAlloctorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { - // TODO + // TODO: error code of buffer pool } pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { // TODO: free STQ return NULL; } - pTq->tqExec = tqExec; return pTq; } @@ -70,16 +68,16 @@ static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuer if (1 /* TODO: need to launch new query */) { STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg)); if (pNewQuery == NULL) { - // TODO: memory insufficient + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } // TODO: lock executor - pNewQuery->exec->executor = pTopic->buffer[idx].executor; // TODO: read from wal and assign to src - pNewQuery->exec->src = 0; - pNewQuery->exec->dest = &pTopic->buffer[idx]; - pNewQuery->next = *ppQuery; - *ppQuery = pNewQuery; + /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/ + /*pNewQuery->exec->src = 0;*/ + /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/ + /*pNewQuery->next = *ppQuery;*/ + /**ppQuery = pNewQuery;*/ } return 0; } @@ -134,10 +132,10 @@ STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { // TODO return NULL; } + tqHandleMovePut(pTq->tqMeta, cId, pGroup); } + ASSERT(pGroup); - // create - // open return pGroup; } @@ -272,6 +270,33 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) { return 0; } +int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); + if (pGroup == NULL) { + terrno = TSDB_CODE_TQ_GROUP_NOT_SET; + return -1; + } + + STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; + int numOfMsgs = tqFetch(pGroup, (void**)&pRsp->msgs); + if (numOfMsgs < 0) { + return -1; + } + if (numOfMsgs == 0) { + // most recent data has been fetched + + // enable timer for blocking wait + // once new data written during wait time + // launch query and response + } + + // fetched a num of msgs, rpc response + + return 0; +} + +#if 0 int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { if (!tqProtoCheck((STqMsgHead*)pMsg)) { // proto version invalid @@ -304,6 +329,7 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { /*}*/ return 0; } +#endif int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { // calculate size @@ -320,7 +346,7 @@ int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { } void* ptr = (*ppHead)->content; // do serialization - *(int64_t*)ptr = pGroup->cId; + *(int64_t*)ptr = pGroup->clientId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = pGroup->cgId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -366,7 +392,7 @@ void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) { const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) { STqGroup* gHandle = *ppGroup; const void* ptr = pHead->content; - gHandle->cId = *(int64_t*)ptr; + gHandle->clientId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); gHandle->cgId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 5a88e2176f03d87a2ed5b44abe8b7cb026797981..57e20010e3b12b615875e1ecaba1eddd05161562 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -56,6 +56,7 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) { int offset = tqSeekLastPage(fd); int nBytes; if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (nBytes == 0) { @@ -71,7 +72,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial int32_t tqConfigFlag) { STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); if (pMeta == NULL) { - // close + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } memset(pMeta, 0, sizeof(STqMetaStore)); @@ -79,8 +80,9 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial // concat data file name and index file name size_t pathLen = strlen(path); pMeta->dirPath = malloc(pathLen + 1); - if (pMeta->dirPath != NULL) { - // TODO: memory insufficient + if (pMeta->dirPath == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return NULL; } strcpy(pMeta->dirPath, path); @@ -88,13 +90,14 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial strcpy(name, path); if (taosDirExist(name) != 0 && taosMkDir(name) != 0) { - ASSERT(false); + terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR; + tqError("failed to create dir:%s since %s ", name, terrstr()); } strcat(name, "/" TQ_IDX_NAME); int idxFd = open(name, O_RDWR | O_CREAT, 0755); if (idxFd < 0) { - ASSERT(false); - // close file + terrno = TAOS_SYSTEM_ERROR(errno); + tqError("failed to open file:%s since %s ", name, terrstr()); // free memory return NULL; } @@ -102,9 +105,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial pMeta->idxFd = idxFd; pMeta->unpersistHead = malloc(sizeof(STqMetaList)); if (pMeta->unpersistHead == NULL) { - ASSERT(false); - // close file - // free memory + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); @@ -114,7 +115,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial strcat(name, "/" TQ_META_NAME); int fileFd = open(name, O_RDWR | O_CREAT, 0755); if (fileFd < 0) { - ASSERT(false); + terrno = TAOS_SYSTEM_ERROR(errno); + tqError("failed to open file:%s since %s", name, terrstr()); return NULL; } @@ -129,7 +131,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial STqIdxPageBuf idxBuf; STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if (serializedObj == NULL) { - // TODO:memory insufficient + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; } int idxRead; int allocated = TQ_PAGE_SIZE; @@ -137,14 +139,16 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { if (idxRead == -1) { // TODO: handle error - ASSERT(false); + terrno = TAOS_SYSTEM_ERROR(errno); + tqError("failed to read tq index file since %s", terrstr()); } ASSERT(idxBuf.head.writeOffset == idxRead); // loop read every entry for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { STqMetaList* pNode = malloc(sizeof(STqMetaList)); if (pNode == NULL) { - // TODO: free memory and return error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + // TODO: free memory } memset(pNode, 0, sizeof(STqMetaList)); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); @@ -153,7 +157,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial if (allocated < pNode->handle.serializedSize) { void* ptr = realloc(serializedObj, pNode->handle.serializedSize); if (ptr == NULL) { - // TODO: memory insufficient + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + // TODO: free memory } serializedObj = ptr; allocated = pNode->handle.serializedSize; @@ -292,7 +297,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) { STqMetaList* pNode = pHead->unpersistNext; STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead)); if (pSHead == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } pSHead->ver = TQ_SVER; @@ -403,7 +408,6 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { - // TODO: think about thread safety if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } @@ -416,7 +420,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu } STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); if (pNewNode == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memset(pNewNode, 0, sizeof(STqMetaList)); @@ -470,10 +474,10 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { - // TODO: think about thread safety if (pNode->handle.valueInTxn) { if (tqDupIntxnReject(pMeta->tqConfigFlag)) { - return -2; + terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN; + return -1; } if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInTxn); @@ -488,7 +492,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va } STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); if (pNewNode == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memset(pNewNode, 0, sizeof(STqMetaList)); @@ -505,7 +509,7 @@ int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { void* vmem = malloc(vsize); if (vmem == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memcpy(vmem, value, vsize); @@ -535,6 +539,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn == NULL) { + terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; return -1; } if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { @@ -548,7 +553,8 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { pNode = pNode->next; } } - return -2; + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; + return -1; } int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { @@ -564,12 +570,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { tqLinkUnpersist(pMeta, pNode); return 0; } + terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; return -1; } else { pNode = pNode->next; } } - return -2; + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; + return -1; } int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { @@ -588,7 +596,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { pNode = pNode->next; } } - // no such key + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; return -1; }