diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 319bd9fde67b49112c968c75c7177b8453a2b66c..b6dd90a4e2586f44a3865e73ef335be92e778919 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -59,7 +59,7 @@ typedef struct { SWalCfg walCfg; uint32_t hashBegin; uint32_t hashEnd; - int8_t hashMethod; + int8_t hashMethod; } SVnodeCfg; typedef struct { @@ -202,6 +202,22 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); /* ------------------------- TQ READ --------------------------- */ +enum { + TQ_STREAM_TOKEN__DATA = 1, + TQ_STREAM_TOKEN__WATERMARK, + TQ_STREAM_TOKEN__CHECKPOINT, +}; + +typedef struct { + int8_t type; + int8_t reserved[7]; + union { + void *data; + int64_t wmTs; + int64_t checkpointId; + }; +} STqStreamToken; + STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) { diff --git a/source/dnode/vnode/src/inc/tqPush.h b/source/dnode/vnode/src/inc/tqPush.h index 32fd7c3ddf4cd5efd0a5125ae67865afbadfb580..a6121c5dc1b7e5b29626a2b6cb59bd9bd63b40b9 100644 --- a/source/dnode/vnode/src/inc/tqPush.h +++ b/source/dnode/vnode/src/inc/tqPush.h @@ -16,9 +16,11 @@ #ifndef _TQ_PUSH_H_ #define _TQ_PUSH_H_ +#include "executor.h" #include "thash.h" #include "trpc.h" #include "ttimer.h" +#include "vnode.h" #ifdef __cplusplus extern "C" { @@ -39,11 +41,12 @@ typedef struct { } STqClientPusher; typedef struct { - int8_t type; - int8_t nodeType; - int8_t reserved[6]; - int64_t streamId; - SEpSet epSet; + int8_t type; + int8_t nodeType; + int8_t reserved[6]; + int64_t streamId; + qTaskInfo_t task; + // TODO sync function } STqStreamPusher; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 16809f1527b05eceafd3c54507a8d029145ff2f3..d15481b4aa54c6c5009f0030eca63b0685a749ec 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -67,6 +67,26 @@ void tqClose(STQ* pTq) { } int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + if (msgType != TDMT_VND_SUBMIT) return 0; + void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL); + while (pIter != NULL) { + STqPusher* pusher = *(STqPusher**)pIter; + if (pusher->type == TQ_PUSHER_TYPE__STREAM) { + STqStreamPusher* streamPusher = (STqStreamPusher*)pusher; + // repack + STqStreamToken* token = malloc(sizeof(STqStreamToken)); + if (token == NULL) { + taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + token->type = TQ_STREAM_TOKEN__DATA; + token->data = msg; + // set input + // exec + } + // send msg to ep + } // iterate hash // process all msg // if waiting diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index fea65846be749d2fe73f5319477a989e86e2cc33..4186f29e2aa7111ce53f143247bbaf5f15018d66 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -73,7 +73,7 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet streamPusher->type = TQ_PUSHER_TYPE__STREAM; streamPusher->nodeType = 0; streamPusher->streamId = streamId; - memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet)); + /*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/ if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2f24df03099a728afe30c1b8383e68e2b0face62..92a111298fa093699dd75cbadec5846d11fefa44 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "vnode.h" @@ -37,6 +36,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + // iterate and convert if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; while (true) { if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 699da7579028dfd690a24a4136ea59617c398d00..0f6ac0b214741eab7f365603457479ff778cf876 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -34,9 +34,6 @@ int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) { int code; char fnameStr[WAL_FILE_LEN]; - if (ver == pWal->vers.lastVer) { - return 0; - } if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 230555e016d58178b42e258b7c350aa0804d513b..5bfea9ab5e2773bf3e4b6c080eba1a9e889f65a1 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -124,13 +124,8 @@ class WalRetentionEnv : public ::testing::Test { void SetUp() override { SWalCfg cfg; - cfg.rollPeriod = -1, - cfg.segSize = -1, - cfg.retentionPeriod = -1, - cfg.retentionSize = 0, - cfg.rollPeriod = 0, - cfg.vgId = 0, - cfg.level = TAOS_WAL_FSYNC; + cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0, + cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, &cfg); ASSERT(pWal != NULL); } @@ -241,6 +236,12 @@ TEST_F(WalCleanEnv, rollback) { ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); } + code = walRollback(pWal, 12); + ASSERT_NE(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); + code = walRollback(pWal, 9); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 8); code = walRollback(pWal, 5); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, 4); @@ -324,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) { TEST_F(WalRetentionEnv, repairMeta1) { walResetEnv(); int code; - + int i; for (i = 0; i < 100; i++) { char newStr[100]; @@ -336,14 +337,14 @@ TEST_F(WalRetentionEnv, repairMeta1) { TearDown(); - //getchar(); + // getchar(); char buf[100]; sprintf(buf, "%s/meta-ver%d", pathName, 0); taosRemoveFile(buf); sprintf(buf, "%s/meta-ver%d", pathName, 1); taosRemoveFile(buf); SetUp(); - //getchar(); + // getchar(); ASSERT_EQ(pWal->vers.lastVer, 99); @@ -401,5 +402,4 @@ TEST_F(WalRetentionEnv, repairMeta1) { EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); } } - }