diff --git a/example/src/tmq.c b/example/src/tmq.c index ac7098b254652b9486bf5bd49ff5fe922f943f02..26e5ea82c19a3f0ac90689a4b4f1362caa193ec6 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -28,7 +28,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; @@ -62,6 +62,23 @@ int32_t init_env() { return -1; } taos_free_result(pRes); + return 0; +} + +int32_t create_topic() { + printf("create topic"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); const char* sql = "select * from tu1"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); @@ -193,6 +210,7 @@ int main(int argc, char* argv[]) { printf("env init\n"); code = init_env(); } + create_topic(); tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); /*perf_loop(tmq, topic_list);*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b42730d851f077b1bb30136ae10361f88b199fdf..5164fb6ccd2323764ac3aec259559ee572fbde4f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -745,6 +745,7 @@ typedef struct { int8_t cacheLastRow; int8_t replica; int8_t selfIndex; + int8_t streamMode; SReplica replicas[TSDB_MAX_REPLICA]; } SCreateVnodeReq, SAlterVnodeReq; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 314aa036fbe192728cb1f1d0ff6b3b17d4644e11..ac46c0c48c4d0b2169029833936c1313cf2d361d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -45,6 +45,8 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) { } int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + ASSERT(pIter->len >= 0); + if (pIter->len == 0) { pIter->len += sizeof(SSubmitReq); } else { @@ -2109,6 +2111,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; + if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { SReplica *pReplica = &pReq->replicas[i]; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; @@ -2148,6 +2151,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { SReplica *pReplica = &pReq->replicas[i]; if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index ebb2d1b4f0e4b9d63ae862bfa47b755bb3dcb9f9..b82d991179a8b698b4be348a8f51ff164cb5e2d6 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -507,6 +507,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->isHeapAllocator = true; pCfg->ttl = 4; pCfg->keep = pCreate->daysToKeep0; + pCfg->streamMode = pCreate->streamMode; pCfg->isWeak = true; pCfg->tsdbCfg.keep = pCreate->daysToKeep0; pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 20c72d118ba282322d4336dd5ca134ea16e2061b..e64191f7153c87301fa14d74e44cdb75b5f2cd89 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -337,6 +337,7 @@ typedef struct { int64_t pointsWritten; int8_t compact; int8_t replica; + int8_t streamMode; SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; } SVgObj; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 122982b7f26176e793746b8a6c803845efd28d85..9c8a4ce586fc81db61759df6819435f1c9911680 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -395,24 +395,27 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat dbObj.vgVersion = 1; dbObj.hashMethod = 1; memcpy(dbObj.createUser, pUser->user, TSDB_USER_LEN); - dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups, - .cacheBlockSize = pCreate->cacheBlockSize, - .totalBlocks = pCreate->totalBlocks, - .daysPerFile = pCreate->daysPerFile, - .daysToKeep0 = pCreate->daysToKeep0, - .daysToKeep1 = pCreate->daysToKeep1, - .daysToKeep2 = pCreate->daysToKeep2, - .minRows = pCreate->minRows, - .maxRows = pCreate->maxRows, - .fsyncPeriod = pCreate->fsyncPeriod, - .commitTime = pCreate->commitTime, - .precision = pCreate->precision, - .compression = pCreate->compression, - .walLevel = pCreate->walLevel, - .replications = pCreate->replications, - .quorum = pCreate->quorum, - .update = pCreate->update, - .cacheLastRow = pCreate->cacheLastRow}; + dbObj.cfg = (SDbCfg){ + .numOfVgroups = pCreate->numOfVgroups, + .cacheBlockSize = pCreate->cacheBlockSize, + .totalBlocks = pCreate->totalBlocks, + .daysPerFile = pCreate->daysPerFile, + .daysToKeep0 = pCreate->daysToKeep0, + .daysToKeep1 = pCreate->daysToKeep1, + .daysToKeep2 = pCreate->daysToKeep2, + .minRows = pCreate->minRows, + .maxRows = pCreate->maxRows, + .fsyncPeriod = pCreate->fsyncPeriod, + .commitTime = pCreate->commitTime, + .precision = pCreate->precision, + .compression = pCreate->compression, + .walLevel = pCreate->walLevel, + .replications = pCreate->replications, + .quorum = pCreate->quorum, + .update = pCreate->update, + .cacheLastRow = pCreate->cacheLastRow, + .streamMode = pCreate->streamMode, + }; mndSetDefaultDbCfg(&dbObj.cfg); @@ -1400,4 +1403,4 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9dba166b95e360c6c8f1be18b47f67e03874f4cf..a5c4c41244e32d79d99b6072601267f4e923e4bb 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1099,6 +1099,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { pConsumerEp->consumerId = consumerId; taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); if (pConsumerEp->oldConsumerId == -1) { + mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName, + pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); } else { mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index a2438a3b8ee0d3167e95f7e771b7ff16dc719ffb..b437b444177d5594c2c637b2741fe367b0305674 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -21,7 +21,7 @@ #include "mndShow.h" #include "mndTrans.h" -#define TSDB_VGROUP_VER_NUMBER 1 +#define TSDB_VGROUP_VER_NUMBER 1 #define TSDB_VGROUP_RESERVE_SIZE 64 static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); @@ -214,6 +214,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.cacheLastRow = pDb->cfg.cacheLastRow; createReq.replica = pVgroup->replica; createReq.selfIndex = -1; + createReq.streamMode = pVgroup->streamMode; for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = &createReq.replicas[v]; @@ -255,8 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg return pReq; } -void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, - int32_t *pContLen) { +void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SDropVnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; dropReq.vgId = pVgroup->vgId; @@ -399,6 +399,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { pVgroup->createdTime = taosGetTimestampMs(); pVgroup->updateTime = pVgroups->createdTime; pVgroup->version = 1; + pVgroup->streamMode = pDb->cfg.streamMode; pVgroup->hashBegin = hashMin + hashInterval * v; if (v == pDb->cfg.numOfVgroups - 1) { pVgroup->hashEnd = hashMax; @@ -700,4 +701,4 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index eaf30da1bb8bcc3a47e828988c0a977587a2ea1e..1fb0b05f9e10e837ad6a79cfe4a23d06c1d15914 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -51,6 +51,7 @@ typedef struct { bool isHeapAllocator; uint32_t ttl; uint32_t keep; + int8_t streamMode; bool isWeak; STsdbCfg tsdbCfg; SMetaCfg metaCfg; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 5c39c65f9ff12d25c881213ac4ac53e29c6dda45..ade280eecc85f9346331bf5a91a79c8afbdf1198 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -43,13 +43,17 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SVCreateTbReq vCreateTbReq; SVCreateTbBatchReq vCreateTbBatchReq; - void *ptr = vnodeMalloc(pVnode, pMsg->contLen); - if (ptr == NULL) { - // TODO: handle error - } + void *ptr = NULL; - // TODO: copy here need to be extended - memcpy(ptr, pMsg->pCont, pMsg->contLen); + if (pVnode->config.streamMode == 0) { + ptr = vnodeMalloc(pVnode, pMsg->contLen); + if (ptr == NULL) { + // TODO: handle error + } + + // TODO: copy here need to be extended + memcpy(ptr, pMsg->pCont, pMsg->contLen); + } // todo: change the interface here int64_t ver; @@ -109,17 +113,19 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // } break; case TDMT_VND_SUBMIT: - if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { - // TODO: handle error + if (pVnode->config.streamMode == 0) { + if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { + // TODO: handle error + } } break; case TDMT_VND_MQ_SET_CONN: { - if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { + if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO: handle error } } break; case TDMT_VND_MQ_REB: { - if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { + if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; default: