提交 82bcecc4 编写于 作者: L Liu Jicong

add stream mode config

上级 8305f7e4
...@@ -28,7 +28,7 @@ int32_t init_env() { ...@@ -28,7 +28,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -62,6 +62,23 @@ int32_t init_env() { ...@@ -62,6 +62,23 @@ int32_t init_env() {
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
return 0;
}
int32_t create_topic() {
printf("create topic");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
const char* sql = "select * from tu1"; const char* sql = "select * from tu1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
...@@ -193,6 +210,7 @@ int main(int argc, char* argv[]) { ...@@ -193,6 +210,7 @@ int main(int argc, char* argv[]) {
printf("env init\n"); printf("env init\n");
code = init_env(); code = init_env();
} }
create_topic();
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/ /*perf_loop(tmq, topic_list);*/
......
...@@ -745,6 +745,7 @@ typedef struct { ...@@ -745,6 +745,7 @@ typedef struct {
int8_t cacheLastRow; int8_t cacheLastRow;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
int8_t streamMode;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
} SCreateVnodeReq, SAlterVnodeReq; } SCreateVnodeReq, SAlterVnodeReq;
......
...@@ -45,6 +45,8 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) { ...@@ -45,6 +45,8 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
} }
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(pIter->len >= 0);
if (pIter->len == 0) { if (pIter->len == 0) {
pIter->len += sizeof(SSubmitReq); pIter->len += sizeof(SSubmitReq);
} else { } else {
...@@ -2109,6 +2111,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR ...@@ -2109,6 +2111,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
...@@ -2148,6 +2151,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * ...@@ -2148,6 +2151,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
......
...@@ -507,6 +507,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { ...@@ -507,6 +507,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->isHeapAllocator = true; pCfg->isHeapAllocator = true;
pCfg->ttl = 4; pCfg->ttl = 4;
pCfg->keep = pCreate->daysToKeep0; pCfg->keep = pCreate->daysToKeep0;
pCfg->streamMode = pCreate->streamMode;
pCfg->isWeak = true; pCfg->isWeak = true;
pCfg->tsdbCfg.keep = pCreate->daysToKeep0; pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
......
...@@ -337,6 +337,7 @@ typedef struct { ...@@ -337,6 +337,7 @@ typedef struct {
int64_t pointsWritten; int64_t pointsWritten;
int8_t compact; int8_t compact;
int8_t replica; int8_t replica;
int8_t streamMode;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
} SVgObj; } SVgObj;
......
...@@ -395,24 +395,27 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat ...@@ -395,24 +395,27 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat
dbObj.vgVersion = 1; dbObj.vgVersion = 1;
dbObj.hashMethod = 1; dbObj.hashMethod = 1;
memcpy(dbObj.createUser, pUser->user, TSDB_USER_LEN); memcpy(dbObj.createUser, pUser->user, TSDB_USER_LEN);
dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups, dbObj.cfg = (SDbCfg){
.cacheBlockSize = pCreate->cacheBlockSize, .numOfVgroups = pCreate->numOfVgroups,
.totalBlocks = pCreate->totalBlocks, .cacheBlockSize = pCreate->cacheBlockSize,
.daysPerFile = pCreate->daysPerFile, .totalBlocks = pCreate->totalBlocks,
.daysToKeep0 = pCreate->daysToKeep0, .daysPerFile = pCreate->daysPerFile,
.daysToKeep1 = pCreate->daysToKeep1, .daysToKeep0 = pCreate->daysToKeep0,
.daysToKeep2 = pCreate->daysToKeep2, .daysToKeep1 = pCreate->daysToKeep1,
.minRows = pCreate->minRows, .daysToKeep2 = pCreate->daysToKeep2,
.maxRows = pCreate->maxRows, .minRows = pCreate->minRows,
.fsyncPeriod = pCreate->fsyncPeriod, .maxRows = pCreate->maxRows,
.commitTime = pCreate->commitTime, .fsyncPeriod = pCreate->fsyncPeriod,
.precision = pCreate->precision, .commitTime = pCreate->commitTime,
.compression = pCreate->compression, .precision = pCreate->precision,
.walLevel = pCreate->walLevel, .compression = pCreate->compression,
.replications = pCreate->replications, .walLevel = pCreate->walLevel,
.quorum = pCreate->quorum, .replications = pCreate->replications,
.update = pCreate->update, .quorum = pCreate->quorum,
.cacheLastRow = pCreate->cacheLastRow}; .update = pCreate->update,
.cacheLastRow = pCreate->cacheLastRow,
.streamMode = pCreate->streamMode,
};
mndSetDefaultDbCfg(&dbObj.cfg); mndSetDefaultDbCfg(&dbObj.cfg);
...@@ -1400,4 +1403,4 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 ...@@ -1400,4 +1403,4 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
...@@ -1099,6 +1099,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -1099,6 +1099,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
pConsumerEp->consumerId = consumerId; pConsumerEp->consumerId = consumerId;
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
if (pConsumerEp->oldConsumerId == -1) { if (pConsumerEp->oldConsumerId == -1) {
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName,
pConsumerEp->consumerId);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
} else { } else {
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#define TSDB_VGROUP_VER_NUMBER 1 #define TSDB_VGROUP_VER_NUMBER 1
#define TSDB_VGROUP_RESERVE_SIZE 64 #define TSDB_VGROUP_RESERVE_SIZE 64
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
...@@ -214,6 +214,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg ...@@ -214,6 +214,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.cacheLastRow = pDb->cfg.cacheLastRow; createReq.cacheLastRow = pDb->cfg.cacheLastRow;
createReq.replica = pVgroup->replica; createReq.replica = pVgroup->replica;
createReq.selfIndex = -1; createReq.selfIndex = -1;
createReq.streamMode = pVgroup->streamMode;
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v]; SReplica *pReplica = &createReq.replicas[v];
...@@ -255,8 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg ...@@ -255,8 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return pReq; return pReq;
} }
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
int32_t *pContLen) {
SDropVnodeReq dropReq = {0}; SDropVnodeReq dropReq = {0};
dropReq.dnodeId = pDnode->id; dropReq.dnodeId = pDnode->id;
dropReq.vgId = pVgroup->vgId; dropReq.vgId = pVgroup->vgId;
...@@ -399,6 +399,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -399,6 +399,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroup->createdTime = taosGetTimestampMs(); pVgroup->createdTime = taosGetTimestampMs();
pVgroup->updateTime = pVgroups->createdTime; pVgroup->updateTime = pVgroups->createdTime;
pVgroup->version = 1; pVgroup->version = 1;
pVgroup->streamMode = pDb->cfg.streamMode;
pVgroup->hashBegin = hashMin + hashInterval * v; pVgroup->hashBegin = hashMin + hashInterval * v;
if (v == pDb->cfg.numOfVgroups - 1) { if (v == pDb->cfg.numOfVgroups - 1) {
pVgroup->hashEnd = hashMax; pVgroup->hashEnd = hashMax;
...@@ -700,4 +701,4 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i ...@@ -700,4 +701,4 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) { static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
...@@ -51,6 +51,7 @@ typedef struct { ...@@ -51,6 +51,7 @@ typedef struct {
bool isHeapAllocator; bool isHeapAllocator;
uint32_t ttl; uint32_t ttl;
uint32_t keep; uint32_t keep;
int8_t streamMode;
bool isWeak; bool isWeak;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SMetaCfg metaCfg; SMetaCfg metaCfg;
......
...@@ -43,13 +43,17 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -43,13 +43,17 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq; SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq; SVCreateTbBatchReq vCreateTbBatchReq;
void *ptr = vnodeMalloc(pVnode, pMsg->contLen); void *ptr = NULL;
if (ptr == NULL) {
// TODO: handle error
}
// TODO: copy here need to be extended if (pVnode->config.streamMode == 0) {
memcpy(ptr, pMsg->pCont, pMsg->contLen); ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
// TODO: copy here need to be extended
memcpy(ptr, pMsg->pCont, pMsg->contLen);
}
// todo: change the interface here // todo: change the interface here
int64_t ver; int64_t ver;
...@@ -109,17 +113,19 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -109,17 +113,19 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// } // }
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { if (pVnode->config.streamMode == 0) {
// TODO: handle error if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) {
// TODO: handle error
}
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error // TODO: handle error
} }
} break; } break;
case TDMT_VND_MQ_REB: { case TDMT_VND_MQ_REB: {
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
} }
} break; } break;
default: default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册