提交 45cf8aa1 编写于 作者: H Hongze Cheng

refact meta 3

上级 dac63922
......@@ -1436,26 +1436,34 @@ typedef struct {
func_id_t* pFuncIds;
} SRSmaParam;
int tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
typedef struct SVCreateStbReq {
const char* name;
tb_uid_t suid;
int8_t rollup;
int32_t ttl;
int16_t nCols;
SSchema* pSchema;
int16_t nTags;
SSchema* pSchemaTg;
SRSmaParam* pRSmaParam;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
typedef struct SVCreateStbRsp {
int code;
} SVCreateStbRsp;
typedef struct SVCreateTbReq {
char* name;
uint32_t ttl;
uint32_t keep;
uint8_t type;
union {
uint8_t info;
struct {
uint8_t rollup : 1; // 1 means rollup sma
uint8_t type : 7;
};
};
union {
struct {
tb_uid_t suid;
int16_t nCols;
SSchema* pSchema;
int16_t nTagCols;
SSchema* pTagSchema;
SRSmaParam* pRSmaParam;
} stbCfg;
struct {
tb_uid_t suid;
SKVRow pTag;
......
......@@ -17,7 +17,8 @@
#define _TD_UTIL_ENCODE_H_
#include "tcoding.h"
#include "tfreelist.h"
#include "tlist.h"
// #include "tfreelist.h"
#ifdef __cplusplus
extern "C" {
......@@ -62,10 +63,14 @@ struct SCoderNode {
CODER_NODE_FIELDS
};
typedef struct SCoderMem {
struct SCoderMem* next;
} SCoderMem;
typedef struct {
td_coder_t type;
td_endian_t endian;
SFreeList fl;
SCoderMem* mList;
CODER_NODE_FIELDS
TD_SLIST(SCoderNode) stack;
} SCoder;
......@@ -74,7 +79,17 @@ typedef struct {
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
#define TCODER_MALLOC(PTR, TYPE, SIZE, CODER) TFL_MALLOC(PTR, TYPE, SIZE, &((CODER)->fl))
#define TCODER_MALLOC(PCODER, SIZE) \
({ \
void* ptr = NULL; \
SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \
if (pMem) { \
pMem->next = (PCODER)->mList; \
(PCODER)->mList = pMem; \
ptr = (void*)&pMem[1]; \
} \
ptr; \
})
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type);
void tCoderClear(SCoder* pCoder);
......
......@@ -478,37 +478,38 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU32(buf, pReq->ttl);
tlen += taosEncodeFixedU32(buf, pReq->keep);
tlen += taosEncodeFixedU8(buf, pReq->info);
tlen += taosEncodeFixedU8(buf, pReq->type);
// tlen += taosEncodeFixedU8(buf, pReq->info);
switch (pReq->type) {
case TD_SUPER_TABLE:
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
}
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
}
if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
tlen += taosEncodeFixedI32(buf, param->delay);
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
}
}
break;
// case TD_SUPER_TABLE:
// tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
// tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
// for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
// tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
// tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
// tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
// tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
// tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
// }
// tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
// for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
// tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
// tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags);
// tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
// tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
// tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
// }
// if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
// SRSmaParam *param = pReq->stbCfg.pRSmaParam;
// tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
// tlen += taosEncodeFixedI32(buf, param->delay);
// tlen += taosEncodeFixedI8(buf, param->nFuncIds);
// for (int8_t i = 0; i < param->nFuncIds; ++i) {
// tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
// }
// }
// break;
case TD_CHILD_TABLE:
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
......@@ -534,47 +535,48 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep));
buf = taosDecodeFixedU8(buf, &(pReq->info));
buf = taosDecodeFixedU8(buf, &pReq->type);
// buf = taosDecodeFixedU8(buf, &(pReq->info));
switch (pReq->type) {
case TD_SUPER_TABLE:
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
}
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols);
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
}
if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeBinaryTo(buf, (void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
}
} else {
param->pFuncIds = NULL;
}
} else {
pReq->stbCfg.pRSmaParam = NULL;
}
break;
// case TD_SUPER_TABLE:
// buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
// buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
// pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
// for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
// buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
// buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags));
// buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
// buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
// buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
// }
// buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols);
// pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
// for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
// buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
// buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags));
// buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
// buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
// buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
// }
// if (pReq->rollup) {
// pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
// SRSmaParam *param = pReq->stbCfg.pRSmaParam;
// buf = taosDecodeBinaryTo(buf, (void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
// buf = taosDecodeFixedI32(buf, &param->delay);
// buf = taosDecodeFixedI8(buf, &param->nFuncIds);
// if (param->nFuncIds > 0) {
// param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
// for (int8_t i = 0; i < param->nFuncIds; ++i) {
// buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
// }
// } else {
// param->pFuncIds = NULL;
// }
// } else {
// pReq->stbCfg.pRSmaParam = NULL;
// }
// break;
case TD_CHILD_TABLE:
buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid);
buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag);
......@@ -3221,7 +3223,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
TCODER_MALLOC(pReq->offsets, SMqOffset *, pReq->num * sizeof(SMqOffset), decoder);
pReq->offsets = (SMqOffset *)TCODER_MALLOC(decoder, sizeof(SMqOffset) * pReq->num);
if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
......@@ -3581,3 +3583,55 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast);
}
int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1;
if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1;
if (tEncodeI16v(pCoder, pReq->nCols) < 0) return -1;
for (int iCol = 0; iCol < pReq->nCols; iCol++) {
if (tEncodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1;
}
if (tEncodeI16v(pCoder, pReq->nTags) < 0) return -1;
for (int iTag = 0; iTag < pReq->nTags; iTag++) {
if (tEncodeSSchema(pCoder, pReq->pSchemaTg + iTag) < 0) return -1;
}
// if (pReq->rollup) {
// if (tEncodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1;
// }
tEndEncode(pCoder);
return 0;
}
int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1;
if (tDecodeI16v(pCoder, &pReq->nCols) < 0) return -1;
// TCODER_MALLOC(pReq->pSchema, SSchema, sizeof(SSchema) * pReq->nCols, pCoder);
pReq->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nCols);
for (int iCol = 0; iCol < pReq->nCols; iCol++) {
if (tDecodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1;
}
if (tDecodeI16v(pCoder, &pReq->nTags) < 0) return -1;
// TCODER_MALLOC(pReq->pSchemaTg, SSchema, sizeof(SSchema) * pReq->nTags, pCoder);
pReq->pSchemaTg = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pReq->nTags);
for (int iTag = 0; iTag < pReq->nTags; iTag++) {
if (tDecodeSSchema(pCoder, pReq->pSchemaTg + iTag) < 0) return -1;
}
// if (pReq->rollup) {
// if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1;
// }
tEndDecode(pCoder);
return 0;
}
......@@ -72,7 +72,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) +
+ pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
+pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER;
......@@ -394,6 +394,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch
}
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
#if 0
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
......@@ -452,7 +453,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam);
}
taosMemoryFreeClear(req.stbCfg.pSchema);
// taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -468,8 +469,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam);
}
taosMemoryFreeClear(req.stbCfg.pSchema);
// taosMemoryFreeClear(req.stbCfg.pSchema);
return pHead;
#endif
return NULL;
}
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
......
......@@ -41,7 +41,6 @@ int vnodeDecodeConfig(const SJson* pJson, void* pObj);
int vnodeScheduleTask(int (*execute)(void*), void* arg);
// vnodeBufPool ====================
#if 1
typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode {
SVBufPoolNode* prev;
......@@ -64,42 +63,6 @@ int vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool);
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
#else
// SVBufPool
int vnodeOpenBufPool(SVnode* pVnode);
void vnodeCloseBufPool(SVnode* pVnode);
int vnodeBufPoolSwitch(SVnode* pVnode);
int vnodeBufPoolRecycle(SVnode* pVnode);
void* vnodeMalloc(SVnode* pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode* pVnode);
SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode);
// SVMemAllocator
typedef struct SVArenaNode {
TD_SLIST_NODE(SVArenaNode);
uint64_t size; // current node size
void* ptr;
char data[];
} SVArenaNode;
typedef struct SVMemAllocator {
T_REF_DECLARE()
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
uint64_t lsize;
SVArenaNode* pNode;
TD_SLIST(SVArenaNode) nlist;
} SVMemAllocator;
SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize);
void vmaDestroy(SVMemAllocator* pVMA);
void vmaReset(SVMemAllocator* pVMA);
void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
#endif
// vnodeQuery ====================
int vnodeQueryOpen(SVnode* pVnode);
......@@ -108,6 +71,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ====================
int vnodeBegin(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
......
......@@ -55,6 +55,14 @@ int vnodeBegin(SVnode *pVnode) {
return 0;
}
int vnodeShouldCommit(SVnode *pVnode) {
if (pVnode->inUse->size > pVnode->config.szBuf / 3) {
return 1;
}
return 0;
}
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile;
......
......@@ -74,11 +74,11 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
nCols = pSW->nCols;
if (pTbCfg->type == META_SUPER_TABLE) {
nTagCols = pTbCfg->stbCfg.nTagCols;
pTagSchema = pTbCfg->stbCfg.pTagSchema;
// nTagCols = pTbCfg->stbCfg.nTagCols;
// pTagSchema = pTbCfg->stbCfg.pTagSchema;
} else if (pTbCfg->type == META_CHILD_TABLE) {
nTagCols = pStbCfg->stbCfg.nTagCols;
pTagSchema = pStbCfg->stbCfg.pTagSchema;
// nTagCols = pStbCfg->stbCfg.nTagCols;
// pTagSchema = pStbCfg->stbCfg.pTagSchema;
} else {
nTagCols = 0;
pTagSchema = NULL;
......@@ -132,7 +132,7 @@ _exit:
if (pTbCfg) {
taosMemoryFreeClear(pTbCfg->name);
if (pTbCfg->type == META_SUPER_TABLE) {
taosMemoryFree(pTbCfg->stbCfg.pTagSchema);
// taosMemoryFree(pTbCfg->stbCfg.pTagSchema);
} else if (pTbCfg->type == META_SUPER_TABLE) {
kvRowFree(pTbCfg->ctbCfg.pTag);
}
......
......@@ -15,7 +15,7 @@
#include "vnodeInt.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len);
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
......@@ -43,43 +43,53 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL;
void *pReq;
int len;
int ret;
if (pVnode->config.streamMode == 0) {
// ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
vTrace("vgId: %d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version);
// TODO: copy here need to be extended
memcpy(ptr, pMsg->pCont, pMsg->contLen);
}
// skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
// todo: change the interface here
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
// TODO: handle error
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
switch (pMsg->msgType) {
/* META */
case TDMT_VND_CREATE_STB:
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
break;
case TDMT_VND_CREATE_TABLE:
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
ret = vnodeProcessCreateStbReq(pVnode, pReq, len);
break;
case TDMT_VND_ALTER_STB:
vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
vnodeProcessAlterStbReq(pVnode, pReq);
break;
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", TD_VID(pVnode));
break;
case TDMT_VND_CREATE_TABLE:
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
vnodeProcessCreateTbReq(pVnode, pMsg, pReq, pRsp);
break;
case TDMT_VND_ALTER_TABLE:
break;
case TDMT_VND_DROP_TABLE:
break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO
}
} break;
/* TSDB */
case TDMT_VND_SUBMIT:
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
vnodeProcessSubmitReq(pVnode, ptr, pRsp);
break;
/* TQ */
case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error
......@@ -103,20 +113,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
0) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
} break;
default:
ASSERT(0);
break;
......@@ -125,7 +121,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pVnode->state.applied = version;
// Check if it needs to commit
if (0 /*vnodeShouldCommit(pVnode)*/) {
if (vnodeShouldCommit(pVnode)) {
// tsem_wait(&(pVnode->canCommit));
if (vnodeAsyncCommit(pVnode) < 0) {
// TODO: handle error
......@@ -197,7 +193,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return 0;
}
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len) {
SVCreateTbReq vCreateTbReq = {0};
tDeserializeSVCreateTbReq(pReq, &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
......@@ -205,13 +201,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
return -1;
}
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
if (vCreateTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
}
taosMemoryFree(vCreateTbReq.name);
// taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
// taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
// if (vCreateTbReq.stbCfg.pRSmaParam) {
// taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
// taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
// }
// taosMemoryFree(vCreateTbReq.name);
return 0;
}
......@@ -243,12 +239,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
// TODO: to encapsule a free API
taosMemoryFree(pCreateTbReq->name);
if (pCreateTbReq->type == TD_SUPER_TABLE) {
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
if (pCreateTbReq->stbCfg.pRSmaParam) {
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
}
// taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
// taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
// if (pCreateTbReq->stbCfg.pRSmaParam) {
// taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
// taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
// }
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
} else {
......@@ -276,12 +272,12 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
vTrace("vgId:%d, process alter stb req", TD_VID(pVnode));
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
// TODO: to encapsule a free API
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
if (vAlterTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
}
// taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
// taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
// if (vAlterTbReq.stbCfg.pRSmaParam) {
// taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
// taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
// }
taosMemoryFree(vAlterTbReq.name);
return 0;
}
......
......@@ -33,12 +33,19 @@ void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size,
pCoder->data = data;
pCoder->size = size;
pCoder->pos = 0;
tFreeListInit(&(pCoder->fl));
pCoder->mList = NULL;
TD_SLIST_INIT(&(pCoder->stack));
}
void tCoderClear(SCoder* pCoder) {
tFreeListClear(&(pCoder->fl));
SCoderMem* pMem;
// clear memory
for (pMem = pCoder->mList; pMem; pMem = pCoder->mList) {
pCoder->mList = pMem->next;
taosMemoryFree(pMem);
}
struct SCoderNode* pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pCoder->stack));
......
......@@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
const char *tstr;
uint64_t len;
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
TCODER_MALLOC(pSAV1->A_c, char*, len + 1, pCoder);
pSAV1->A_c = (char *)TCODER_MALLOC(pCoder, len + 1);
memcpy(pSAV1->A_c, tstr, len + 1);
tEndDecode(pCoder);
......@@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) {
const char *tstr;
uint64_t len;
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
TCODER_MALLOC(pSAV2->A_c, char*, len + 1, pCoder);
pSAV2->A_c = (char *)TCODER_MALLOC(pCoder, len + 1);
memcpy(pSAV2->A_c, tstr, len + 1);
// ------------------------NEW FIELDS DECODE-------------------------------
......@@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) {
static int32_t tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) {
if (tStartDecode(pCoder) < 0) return -1;
TCODER_MALLOC(ps1->pA, SStructA_v1*, sizeof(*(ps1->pA)), pCoder);
ps1->pA = (SStructA_v1 *)TCODER_MALLOC(pCoder, sizeof(*(ps1->pA)));
if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1;
if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1;
if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1;
......@@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) {
static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) {
if (tStartDecode(pCoder) < 0) return -1;
TCODER_MALLOC(ps2->pA, SStructA_v2*, sizeof(*(ps2->pA)), pCoder);
ps2->pA = (SStructA_v2 *)TCODER_MALLOC(pCoder, sizeof(*(ps2->pA)));
if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1;
if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1;
if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册