diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 12d1c8587944b1e6d506abe1d8c4eb125b1276d5..3f0752ea59b7724ea34d6ccf3883c6bd92f68140 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1012,6 +1012,7 @@ typedef struct { // for tsma int8_t isTsma; + void* pTsma; } SCreateVnodeReq; diff --git a/include/util/tdef.h b/include/util/tdef.h index 1e1770d1b9faa73fc716ed4db0bf5873e24f8c84..61cd335dea8f308e2af933fdf50f699507ab931f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -342,7 +342,7 @@ typedef enum ELogicConditionType { #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_MIN_ROLLUP_FILE_FACTOR 0 -#define TSDB_MAX_ROLLUP_FILE_FACTOR 1 +#define TSDB_MAX_ROLLUP_FILE_FACTOR 10 #define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1 #define TSDB_MIN_TABLE_TTL 0 #define TSDB_DEFAULT_TABLE_TTL 0 diff --git a/include/util/tencode.h b/include/util/tencode.h index cbacd59fa7873c4cb05b8fdaefb321ae3f854e5b..05e4c013c44b58da2a84cd9f784b087bfdb73cb0 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -179,23 +179,23 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val); TD_CODER_MOVE_POS(CODER, sizeof(*(PVAL))); \ return 0; -#define TD_DECODE_VARIANT_MACRO(CODER, PVAL, TYPE) \ - int32_t i = 0; \ - *(PVAL) = 0; \ - for (;;) { \ - if (TD_CODER_CHECK_CAPACITY_FAILED(CODER, 1)) return -1; \ - TYPE tval = TD_CODER_CURRENT(CODER)[0]; \ - if (tval < ENCODE_LIMIT) { \ - *(PVAL) |= (tval << (7 * i)); \ - TD_CODER_MOVE_POS(pCoder, 1); \ - break; \ - } else { \ - *(PVAL) |= (((tval) & (ENCODE_LIMIT - 1)) << (7 * i)); \ - i++; \ - TD_CODER_MOVE_POS(pCoder, 1); \ - } \ - } \ - \ +#define TD_DECODE_VARIANT_MACRO(CODER, PVAL, TYPE) \ + int32_t i = 0; \ + if (PVAL) *(PVAL) = 0; \ + for (;;) { \ + if (TD_CODER_CHECK_CAPACITY_FAILED(CODER, 1)) return -1; \ + TYPE tval = TD_CODER_CURRENT(CODER)[0]; \ + if (tval < ENCODE_LIMIT) { \ + if (PVAL) *(PVAL) |= (tval << (7 * i)); \ + TD_CODER_MOVE_POS(pCoder, 1); \ + break; \ + } else { \ + if (PVAL) *(PVAL) |= (((tval) & (ENCODE_LIMIT - 1)) << (7 * i)); \ + i++; \ + TD_CODER_MOVE_POS(pCoder, 1); \ + } \ + } \ + \ return 0; // 8 @@ -378,14 +378,16 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) { } static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) { - if (tDecodeU32v(pCoder, len) < 0) return -1; + uint32_t length = 0; + if (tDecodeU32v(pCoder, &length) < 0) return -1; + if (len) *len = length; - if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1; + if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1; if (val) { *val = (uint8_t*)TD_CODER_CURRENT(pCoder); } - TD_CODER_MOVE_POS(pCoder, *len); + TD_CODER_MOVE_POS(pCoder, length); return 0; } @@ -410,14 +412,16 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) { } static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uint64_t* len) { - if (tDecodeU64v(pCoder, len) < 0) return -1; + uint64_t length = 0; + if (tDecodeU64v(pCoder, &length) < 0) return -1; + if (len) *len = length; - if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1; - *val = taosMemoryMalloc(*len); + if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1; + *val = taosMemoryMalloc(length); if (*val == NULL) return -1; - memcpy(*val, TD_CODER_CURRENT(pCoder), *len); + memcpy(*val, TD_CODER_CURRENT(pCoder), length); - TD_CODER_MOVE_POS(pCoder, *len); + TD_CODER_MOVE_POS(pCoder, length); return 0; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7615f7b070ca350a27cc7d6a05520386fcaa6759..05b90092e5bef5176bd6ccbf80c6276737f30912 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2921,6 +2921,11 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR } if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1; + if (pReq->isTsma) { + uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen)); + if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2984,6 +2989,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * } if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1; + if (pReq->isTsma) { + if (tDecodeBinaryAlloc(&decoder, &pReq->pTsma, NULL) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -2993,6 +3001,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) { taosArrayDestroy(pReq->pRetensions); pReq->pRetensions = NULL; + if(pReq->isTsma) { + taosMemoryFreeClear(pReq->pTsma); + } return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 8374db129ffacd5ba7776662cb0bc393ac77667e..27ea19a5dcccd2aa7fc26f319203ddf5483e267d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -140,6 +140,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->szCache = pCreate->pages; pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024; pCfg->isWeak = true; + pCfg->isTsma = pCreate->isTsma; pCfg->tsdbCfg.compression = pCreate->compression; pCfg->tsdbCfg.precision = pCreate->precision; pCfg->tsdbCfg.days = pCreate->daysPerFile; @@ -209,7 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { - dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); + dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); code = terrno; goto _OVER; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2164c98c83ec2a976695084692ecfb179d21ca0a..29a3992682f90ced077c111871e27c01761713ff 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -345,6 +345,7 @@ typedef struct { int8_t isTsma; int8_t replica; SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; + void* pTsma; } SVgObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 7b5d1b6c32eaf802eb0683a385acd85498b8a88d..388b0d0c22de80366acdc47b100d85af6eca8e78 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -409,7 +409,8 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { +static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, + SSmaObj *pSma) { SVnodeGid *pVgid = pVgroup->vnodeGid + 0; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode == NULL) return -1; @@ -419,9 +420,14 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, mndReleaseDnode(pMnode, pDnode); // todo add sma info here + int32_t smaContLen = 0; + void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen); + if (pSmaReq == NULL) return -1; + pVgroup->pTsma = pSmaReq; int32_t contLen = 0; void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + taosMemoryFreeClear(pSmaReq); if (pReq == NULL) return -1; action.pCont = pReq; @@ -514,7 +520,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e05b38a7c0345293eb53caeab2eb680f6d113651..dc1d292830433f98af118b7c921f41e23227110e 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.hashMethod = pDb->cfg.hashMethod; createReq.numOfRetensions = pDb->cfg.numOfRetensions; createReq.pRetensions = pDb->cfg.pRetensions; + createReq.isTsma = pVgroup->isTsma; + createReq.pTsma = pVgroup->pTsma; for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = &createReq.replicas[v]; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e4343e3bbf63a9dd847cc1bd2f79e2ef35721cd3..bc493135acad135945dab70b2d92312e95762899 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -159,12 +159,13 @@ struct SVnodeCfg { uint64_t szBuf; bool isHeap; bool isWeak; + int8_t isTsma; + int8_t hashMethod; STsdbCfg tsdbCfg; SWalCfg walCfg; SSyncCfg syncCfg; uint32_t hashBegin; uint32_t hashEnd; - int8_t hashMethod; }; typedef struct { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index a1e397f11d8454b1f0a61afcfde2eeac65c361ea..42c215ee7c276ac6ed7d299390bef7e2083741ce 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -414,7 +414,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 } taosMemoryFreeClear(pReq); } else { - smaWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); + smaDebug("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); } taosArrayDestroy(pResult); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index a66ecc493d7cbef19370349568398d084dc5bc27..56cc3d637405f98688cc68ce970aca21b1e38734 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -56,6 +56,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; @@ -130,6 +131,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if(code < 0) return -1; tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code); if(code < 0) return -1; + tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code); + if(code < 0) return -1; tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); if(code < 0) return -1; tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index a90bb7afcb6847ba0cb803d7a7e58720159bf10f..178ef28e5dd07c9d18bfd92213049d82568c667f 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -97,7 +97,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open tsdb - if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_TYPE_TSDB) < 0) { + if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; }