diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3e8c29c8767e563d1084a916e6b6edd3dadb472d..9f6418e2ed995bd33218b2c844960dd999519eaa 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1436,26 +1436,34 @@ typedef struct { func_id_t* pFuncIds; } SRSmaParam; +int tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); +int tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); + +typedef struct SVCreateStbReq { + const char* name; + tb_uid_t suid; + int8_t rollup; + int32_t ttl; + int16_t nCols; + SSchema* pSchema; + int16_t nTags; + SSchema* pSchemaTg; + SRSmaParam* pRSmaParam; +} SVCreateStbReq; + +int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq); +int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq); + +typedef struct SVCreateStbRsp { + int code; +} SVCreateStbRsp; + typedef struct SVCreateTbReq { char* name; uint32_t ttl; uint32_t keep; + uint8_t type; union { - uint8_t info; - struct { - uint8_t rollup : 1; // 1 means rollup sma - uint8_t type : 7; - }; - }; - union { - struct { - tb_uid_t suid; - int16_t nCols; - SSchema* pSchema; - int16_t nTagCols; - SSchema* pTagSchema; - SRSmaParam* pRSmaParam; - } stbCfg; struct { tb_uid_t suid; SKVRow pTag; diff --git a/include/util/tencode.h b/include/util/tencode.h index 7c877ae4283fc662cf5cd43d59b571e931b08a02..ff8080510130ce804523258cb1bb5b13e82e6183 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -17,7 +17,8 @@ #define _TD_UTIL_ENCODE_H_ #include "tcoding.h" -#include "tfreelist.h" +#include "tlist.h" +// #include "tfreelist.h" #ifdef __cplusplus extern "C" { @@ -62,10 +63,14 @@ struct SCoderNode { CODER_NODE_FIELDS }; +typedef struct SCoderMem { + struct SCoderMem* next; +} SCoderMem; + typedef struct { td_coder_t type; td_endian_t endian; - SFreeList fl; + SCoderMem* mList; CODER_NODE_FIELDS TD_SLIST(SCoderNode) stack; } SCoder; @@ -74,7 +79,17 @@ typedef struct { #define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos) #define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE)) #define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) -#define TCODER_MALLOC(PTR, TYPE, SIZE, CODER) TFL_MALLOC(PTR, TYPE, SIZE, &((CODER)->fl)) +#define TCODER_MALLOC(PCODER, SIZE) \ + ({ \ + void* ptr = NULL; \ + SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \ + if (pMem) { \ + pMem->next = (PCODER)->mList; \ + (PCODER)->mList = pMem; \ + ptr = (void*)&pMem[1]; \ + } \ + ptr; \ + }) void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type); void tCoderClear(SCoder* pCoder); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ed50ab7ff6f0f02c4c8e70ca13ab9f4321776f30..024388f3d85ce59c155de91dcb498cf808e703f2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -478,37 +478,38 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->keep); - tlen += taosEncodeFixedU8(buf, pReq->info); + tlen += taosEncodeFixedU8(buf, pReq->type); + // tlen += taosEncodeFixedU8(buf, pReq->info); switch (pReq->type) { - case TD_SUPER_TABLE: - tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid); - tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols); - for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) { - tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); - tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags); - tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId); - tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); - tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); - } - tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols); - for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) { - tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); - tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags); - tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId); - tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); - tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); - } - if (pReq->rollup && pReq->stbCfg.pRSmaParam) { - SRSmaParam *param = pReq->stbCfg.pRSmaParam; - tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor)); - tlen += taosEncodeFixedI32(buf, param->delay); - tlen += taosEncodeFixedI8(buf, param->nFuncIds); - for (int8_t i = 0; i < param->nFuncIds; ++i) { - tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); - } - } - break; + // case TD_SUPER_TABLE: + // tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid); + // tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols); + // for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) { + // tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); + // tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags); + // tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId); + // tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); + // tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); + // } + // tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols); + // for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) { + // tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); + // tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags); + // tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId); + // tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); + // tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); + // } + // if (pReq->rollup && pReq->stbCfg.pRSmaParam) { + // SRSmaParam *param = pReq->stbCfg.pRSmaParam; + // tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor)); + // tlen += taosEncodeFixedI32(buf, param->delay); + // tlen += taosEncodeFixedI8(buf, param->nFuncIds); + // for (int8_t i = 0; i < param->nFuncIds; ++i) { + // tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); + // } + // } + // break; case TD_CHILD_TABLE: tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid); tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); @@ -534,47 +535,48 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->keep)); - buf = taosDecodeFixedU8(buf, &(pReq->info)); + buf = taosDecodeFixedU8(buf, &pReq->type); + // buf = taosDecodeFixedU8(buf, &(pReq->info)); switch (pReq->type) { - case TD_SUPER_TABLE: - buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid)); - buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols)); - pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema)); - for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) { - buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); - buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags)); - buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId)); - buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); - buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); - } - buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols); - pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); - for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) { - buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); - buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags)); - buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId); - buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); - buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); - } - if (pReq->rollup) { - pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); - SRSmaParam *param = pReq->stbCfg.pRSmaParam; - buf = taosDecodeBinaryTo(buf, (void *)¶m->xFilesFactor, sizeof(param->xFilesFactor)); - buf = taosDecodeFixedI32(buf, ¶m->delay); - buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); - if (param->nFuncIds > 0) { - param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t)); - for (int8_t i = 0; i < param->nFuncIds; ++i) { - buf = taosDecodeFixedI32(buf, param->pFuncIds + i); - } - } else { - param->pFuncIds = NULL; - } - } else { - pReq->stbCfg.pRSmaParam = NULL; - } - break; + // case TD_SUPER_TABLE: + // buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid)); + // buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols)); + // pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema)); + // for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) { + // buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); + // buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags)); + // buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId)); + // buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); + // buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); + // } + // buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols); + // pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); + // for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) { + // buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); + // buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags)); + // buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId); + // buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); + // buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); + // } + // if (pReq->rollup) { + // pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); + // SRSmaParam *param = pReq->stbCfg.pRSmaParam; + // buf = taosDecodeBinaryTo(buf, (void *)¶m->xFilesFactor, sizeof(param->xFilesFactor)); + // buf = taosDecodeFixedI32(buf, ¶m->delay); + // buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); + // if (param->nFuncIds > 0) { + // param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t)); + // for (int8_t i = 0; i < param->nFuncIds; ++i) { + // buf = taosDecodeFixedI32(buf, param->pFuncIds + i); + // } + // } else { + // param->pFuncIds = NULL; + // } + // } else { + // pReq->stbCfg.pRSmaParam = NULL; + // } + // break; case TD_CHILD_TABLE: buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid); buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag); @@ -3221,7 +3223,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { if (tStartDecode(decoder) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - TCODER_MALLOC(pReq->offsets, SMqOffset *, pReq->num * sizeof(SMqOffset), decoder); + pReq->offsets = (SMqOffset *)TCODER_MALLOC(decoder, sizeof(SMqOffset) * pReq->num); if (pReq->offsets == NULL) return -1; for (int32_t i = 0; i < pReq->num; i++) { tDecodeSMqOffset(decoder, &pReq->offsets[i]); @@ -3581,3 +3583,55 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->ast); } + +int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1; + if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1; + if (tEncodeI16v(pCoder, pReq->nCols) < 0) return -1; + for (int iCol = 0; iCol < pReq->nCols; iCol++) { + if (tEncodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1; + } + if (tEncodeI16v(pCoder, pReq->nTags) < 0) return -1; + for (int iTag = 0; iTag < pReq->nTags; iTag++) { + if (tEncodeSSchema(pCoder, pReq->pSchemaTg + iTag) < 0) return -1; + } + // if (pReq->rollup) { + // if (tEncodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1; + // } + + tEndEncode(pCoder); + return 0; +} + +int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1; + if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1; + if (tDecodeI16v(pCoder, &pReq->nCols) < 0) return -1; + + // TCODER_MALLOC(pReq->pSchema, SSchema, sizeof(SSchema) * pReq->nCols, pCoder); + pReq->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nCols); + for (int iCol = 0; iCol < pReq->nCols; iCol++) { + if (tDecodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1; + } + + if (tDecodeI16v(pCoder, &pReq->nTags) < 0) return -1; + // TCODER_MALLOC(pReq->pSchemaTg, SSchema, sizeof(SSchema) * pReq->nTags, pCoder); + pReq->pSchemaTg = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nTags); + for (int iTag = 0; iTag < pReq->nTags; iTag++) { + if (tDecodeSSchema(pCoder, pReq->pSchemaTg + iTag) < 0) return -1; + } + // if (pReq->rollup) { + // if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1; + // } + + tEndDecode(pCoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a777d177bd0c0362703b5e7f14192c8e2178fc2d..1afb91b92c195c572f4ca62dc94bae82e51d7954 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -72,7 +72,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) + - + pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE; + +pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size); if (pRaw == NULL) goto _OVER; @@ -394,6 +394,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch } static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { +#if 0 SName name = {0}; tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); char dbFName[TSDB_DB_FNAME_LEN] = {0}; @@ -452,7 +453,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt taosMemoryFreeClear(pRSmaParam->pFuncIds); taosMemoryFreeClear(pRSmaParam); } - taosMemoryFreeClear(req.stbCfg.pSchema); + // taosMemoryFreeClear(req.stbCfg.pSchema); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -468,8 +469,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt taosMemoryFreeClear(pRSmaParam->pFuncIds); taosMemoryFreeClear(pRSmaParam); } - taosMemoryFreeClear(req.stbCfg.pSchema); + // taosMemoryFreeClear(req.stbCfg.pSchema); return pHead; +#endif + return NULL; } static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index afbea8663fecebed65757f1c2a8d980f5e78cfff..75976c58b5f026eee1249c6bbfdd668399b9ecfb 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -41,7 +41,6 @@ int vnodeDecodeConfig(const SJson* pJson, void* pObj); int vnodeScheduleTask(int (*execute)(void*), void* arg); // vnodeBufPool ==================== -#if 1 typedef struct SVBufPoolNode SVBufPoolNode; struct SVBufPoolNode { SVBufPoolNode* prev; @@ -64,42 +63,6 @@ int vnodeCloseBufPool(SVnode* pVnode); void vnodeBufPoolReset(SVBufPool* pPool); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); -#else -// SVBufPool -int vnodeOpenBufPool(SVnode* pVnode); -void vnodeCloseBufPool(SVnode* pVnode); -int vnodeBufPoolSwitch(SVnode* pVnode); -int vnodeBufPoolRecycle(SVnode* pVnode); -void* vnodeMalloc(SVnode* pVnode, uint64_t size); -bool vnodeBufPoolIsFull(SVnode* pVnode); - -SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode); - -// SVMemAllocator -typedef struct SVArenaNode { - TD_SLIST_NODE(SVArenaNode); - uint64_t size; // current node size - void* ptr; - char data[]; -} SVArenaNode; - -typedef struct SVMemAllocator { - T_REF_DECLARE() - TD_DLIST_NODE(SVMemAllocator); - uint64_t capacity; - uint64_t ssize; - uint64_t lsize; - SVArenaNode* pNode; - TD_SLIST(SVArenaNode) nlist; -} SVMemAllocator; - -SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize); -void vmaDestroy(SVMemAllocator* pVMA); -void vmaReset(SVMemAllocator* pVMA); -void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); -void vmaFree(SVMemAllocator* pVMA, void* ptr); -bool vmaIsFull(SVMemAllocator* pVMA); -#endif // vnodeQuery ==================== int vnodeQueryOpen(SVnode* pVnode); @@ -108,6 +71,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit ==================== int vnodeBegin(SVnode* pVnode); +int vnodeShouldCommit(SVnode* pVnode); int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8b2f22f1bac68d4b3b0f2658f26a7d84f410a74b..60fa2c8aaea448dc63c6aad9d37bc8935d87abab 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -55,6 +55,14 @@ int vnodeBegin(SVnode *pVnode) { return 0; } +int vnodeShouldCommit(SVnode *pVnode) { + if (pVnode->inUse->size > pVnode->config.szBuf / 3) { + return 1; + } + + return 0; +} + int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { char fname[TSDB_FILENAME_LEN]; TdFilePtr pFile; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 4202c02a0c0b755fcc9a6082494377a6257c7855..42d29b6d61c751140634c56b77ee46f8222539c6 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -74,11 +74,11 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { nCols = pSW->nCols; if (pTbCfg->type == META_SUPER_TABLE) { - nTagCols = pTbCfg->stbCfg.nTagCols; - pTagSchema = pTbCfg->stbCfg.pTagSchema; + // nTagCols = pTbCfg->stbCfg.nTagCols; + // pTagSchema = pTbCfg->stbCfg.pTagSchema; } else if (pTbCfg->type == META_CHILD_TABLE) { - nTagCols = pStbCfg->stbCfg.nTagCols; - pTagSchema = pStbCfg->stbCfg.pTagSchema; + // nTagCols = pStbCfg->stbCfg.nTagCols; + // pTagSchema = pStbCfg->stbCfg.pTagSchema; } else { nTagCols = 0; pTagSchema = NULL; @@ -132,7 +132,7 @@ _exit: if (pTbCfg) { taosMemoryFreeClear(pTbCfg->name); if (pTbCfg->type == META_SUPER_TABLE) { - taosMemoryFree(pTbCfg->stbCfg.pTagSchema); + // taosMemoryFree(pTbCfg->stbCfg.pTagSchema); } else if (pTbCfg->type == META_SUPER_TABLE) { kvRowFree(pTbCfg->ctbCfg.pTag); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fb611efad8de47d4a1f3fb0d2f475e0f70eccab6..671e182a028d7a7c4f4ec0897a2adac92000d30e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -15,7 +15,7 @@ #include "vnodeInt.h" -static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); +static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len); static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); @@ -43,43 +43,53 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { void *ptr = NULL; + void *pReq; + int len; int ret; - if (pVnode->config.streamMode == 0) { - // ptr = vnodeMalloc(pVnode, pMsg->contLen); - if (ptr == NULL) { - // TODO: handle error - } + vTrace("vgId: %d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + version); - // TODO: copy here need to be extended - memcpy(ptr, pMsg->pCont, pMsg->contLen); - } + // skip header + pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + len = pMsg->contLen - sizeof(SMsgHead); // todo: change the interface here if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - // TODO: handle error + vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; } switch (pMsg->msgType) { + /* META */ case TDMT_VND_CREATE_STB: - ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); - break; - case TDMT_VND_CREATE_TABLE: - pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; - vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp); + ret = vnodeProcessCreateStbReq(pVnode, pReq, len); break; case TDMT_VND_ALTER_STB: - vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); + vnodeProcessAlterStbReq(pVnode, pReq); break; case TDMT_VND_DROP_STB: vTrace("vgId:%d, process drop stb req", TD_VID(pVnode)); break; + case TDMT_VND_CREATE_TABLE: + pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; + vnodeProcessCreateTbReq(pVnode, pMsg, pReq, pRsp); + break; + case TDMT_VND_ALTER_TABLE: + break; case TDMT_VND_DROP_TABLE: break; + case TDMT_VND_CREATE_SMA: { // timeRangeSMA + if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // TODO + } + } break; + /* TSDB */ case TDMT_VND_SUBMIT: pRsp->msgType = TDMT_VND_SUBMIT_RSP; vnodeProcessSubmitReq(pVnode, ptr, pRsp); break; + /* TQ */ case TDMT_VND_MQ_SET_CONN: { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO: handle error @@ -103,20 +113,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg 0) < 0) { } } break; - case TDMT_VND_CREATE_SMA: { // timeRangeSMA - - if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // TODO - } - // } break; - // case TDMT_VND_CANCEL_SMA: { // timeRangeSMA - // } break; - // case TDMT_VND_DROP_SMA: { // timeRangeSMA - // if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // // TODO - // } - - } break; default: ASSERT(0); break; @@ -125,7 +121,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pVnode->state.applied = version; // Check if it needs to commit - if (0 /*vnodeShouldCommit(pVnode)*/) { + if (vnodeShouldCommit(pVnode)) { // tsem_wait(&(pVnode->canCommit)); if (vnodeAsyncCommit(pVnode) < 0) { // TODO: handle error @@ -197,7 +193,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return 0; } -static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { +static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len) { SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(pReq, &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { @@ -205,13 +201,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { return -1; } - taosMemoryFree(vCreateTbReq.stbCfg.pSchema); - taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); - if (vCreateTbReq.stbCfg.pRSmaParam) { - taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); - } - taosMemoryFree(vCreateTbReq.name); + // taosMemoryFree(vCreateTbReq.stbCfg.pSchema); + // taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); + // if (vCreateTbReq.stbCfg.pRSmaParam) { + // taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); + // taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); + // } + // taosMemoryFree(vCreateTbReq.name); return 0; } @@ -243,12 +239,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR // TODO: to encapsule a free API taosMemoryFree(pCreateTbReq->name); if (pCreateTbReq->type == TD_SUPER_TABLE) { - taosMemoryFree(pCreateTbReq->stbCfg.pSchema); - taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); - if (pCreateTbReq->stbCfg.pRSmaParam) { - taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); - } + // taosMemoryFree(pCreateTbReq->stbCfg.pSchema); + // taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); + // if (pCreateTbReq->stbCfg.pRSmaParam) { + // taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds); + // taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); + // } } else if (pCreateTbReq->type == TD_CHILD_TABLE) { taosMemoryFree(pCreateTbReq->ctbCfg.pTag); } else { @@ -276,12 +272,12 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { vTrace("vgId:%d, process alter stb req", TD_VID(pVnode)); tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); // TODO: to encapsule a free API - taosMemoryFree(vAlterTbReq.stbCfg.pSchema); - taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); - if (vAlterTbReq.stbCfg.pRSmaParam) { - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); - } + // taosMemoryFree(vAlterTbReq.stbCfg.pSchema); + // taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); + // if (vAlterTbReq.stbCfg.pRSmaParam) { + // taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds); + // taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); + // } taosMemoryFree(vAlterTbReq.name); return 0; } diff --git a/source/util/src/tencode.c b/source/util/src/tencode.c index c40d5b02e6fcce760e63d9a2c756c6bf352ee258..037eba5ed2ebf754dd7e3b31922bfbbe2e9a23cc 100644 --- a/source/util/src/tencode.c +++ b/source/util/src/tencode.c @@ -33,12 +33,19 @@ void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, pCoder->data = data; pCoder->size = size; pCoder->pos = 0; - tFreeListInit(&(pCoder->fl)); + pCoder->mList = NULL; TD_SLIST_INIT(&(pCoder->stack)); } void tCoderClear(SCoder* pCoder) { - tFreeListClear(&(pCoder->fl)); + SCoderMem* pMem; + + // clear memory + for (pMem = pCoder->mList; pMem; pMem = pCoder->mList) { + pCoder->mList = pMem->next; + taosMemoryFree(pMem); + } + struct SCoderNode* pNode; for (;;) { pNode = TD_SLIST_HEAD(&(pCoder->stack)); diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index b4da43dfb72600dbcc6ea5fda1fdde220e99c803..9ddbd24353c34ff1c3a81f4384fdac8f30200dfd 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) { const char *tstr; uint64_t len; if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1; - TCODER_MALLOC(pSAV1->A_c, char*, len + 1, pCoder); + pSAV1->A_c = (char *)TCODER_MALLOC(pCoder, len + 1); memcpy(pSAV1->A_c, tstr, len + 1); tEndDecode(pCoder); @@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) { const char *tstr; uint64_t len; if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1; - TCODER_MALLOC(pSAV2->A_c, char*, len + 1, pCoder); + pSAV2->A_c = (char *)TCODER_MALLOC(pCoder, len + 1); memcpy(pSAV2->A_c, tstr, len + 1); // ------------------------NEW FIELDS DECODE------------------------------- @@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) { static int32_t tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) { if (tStartDecode(pCoder) < 0) return -1; - TCODER_MALLOC(ps1->pA, SStructA_v1*, sizeof(*(ps1->pA)), pCoder); + ps1->pA = (SStructA_v1 *)TCODER_MALLOC(pCoder, sizeof(*(ps1->pA))); if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1; if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1; if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1; @@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) { static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) { if (tStartDecode(pCoder) < 0) return -1; - TCODER_MALLOC(ps2->pA, SStructA_v2*, sizeof(*(ps2->pA)), pCoder); + ps2->pA = (SStructA_v2 *)TCODER_MALLOC(pCoder, sizeof(*(ps2->pA))); if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1; if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1; if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1;