提交 323ce214 编写于 作者: H Hongze Cheng

refact encode APIs

上级 4a9ae285
...@@ -338,8 +338,8 @@ typedef struct SEpSet { ...@@ -338,8 +338,8 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA]; SEp eps[TSDB_MAX_REPLICA];
} SEpSet; } SEpSet;
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp); int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
...@@ -611,8 +611,8 @@ typedef struct { ...@@ -611,8 +611,8 @@ typedef struct {
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp); int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp); int32_t tSerializeSUseDbRspImp(SEncoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRspImp(SDecoder* pDecoder, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct { typedef struct {
...@@ -1527,8 +1527,8 @@ typedef struct { ...@@ -1527,8 +1527,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam; } SRSmaParam;
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ============== // TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq { typedef struct SVCreateStbReq {
...@@ -1540,8 +1540,8 @@ typedef struct SVCreateStbReq { ...@@ -1540,8 +1540,8 @@ typedef struct SVCreateStbReq {
SRSmaParam pRSmaParam; SRSmaParam pRSmaParam;
} SVCreateStbReq; } SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq); int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq); int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ============== // TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq { typedef struct SVDropStbReq {
...@@ -1549,8 +1549,8 @@ typedef struct SVDropStbReq { ...@@ -1549,8 +1549,8 @@ typedef struct SVDropStbReq {
tb_uid_t suid; tb_uid_t suid;
} SVDropStbReq; } SVDropStbReq;
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq); int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq); int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
#define TD_CREATE_IF_NOT_EXISTS 0x1 #define TD_CREATE_IF_NOT_EXISTS 0x1
typedef struct SVCreateTbReq { typedef struct SVCreateTbReq {
...@@ -1571,8 +1571,8 @@ typedef struct SVCreateTbReq { ...@@ -1571,8 +1571,8 @@ typedef struct SVCreateTbReq {
}; };
} SVCreateTbReq; } SVCreateTbReq;
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq); int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq); int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
typedef struct { typedef struct {
int32_t nReqs; int32_t nReqs;
...@@ -1582,15 +1582,15 @@ typedef struct { ...@@ -1582,15 +1582,15 @@ typedef struct {
}; };
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SCoder* pCoder, const SVCreateTbBatchReq* pReq); int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SCoder* pCoder, SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct { typedef struct {
int32_t code; int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp; } SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp); int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp); int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
...@@ -1603,8 +1603,8 @@ typedef struct { ...@@ -1603,8 +1603,8 @@ typedef struct {
}; };
} SVCreateTbBatchRsp; } SVCreateTbBatchRsp;
int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp); int tEncodeSVCreateTbBatchRsp(SEncoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp); int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
...@@ -1627,8 +1627,8 @@ typedef struct { ...@@ -1627,8 +1627,8 @@ typedef struct {
}; };
} SVDropTbBatchReq; } SVDropTbBatchReq;
int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq); int32_t tEncodeSVDropTbBatchReq(SEncoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq); int32_t tDecodeSVDropTbBatchReq(SDecoder* pCoder, SVDropTbBatchReq* pReq);
typedef struct { typedef struct {
int32_t nRsps; int32_t nRsps;
...@@ -1638,8 +1638,8 @@ typedef struct { ...@@ -1638,8 +1638,8 @@ typedef struct {
}; };
} SVDropTbBatchRsp; } SVDropTbBatchRsp;
int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp); int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp); int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
...@@ -1821,14 +1821,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { ...@@ -1821,14 +1821,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp); int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp); int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) { static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1; if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1; if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) {
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1; if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1; if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
pKv->value = taosMemoryMalloc(pKv->valueLen + 1); pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
...@@ -1837,13 +1837,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { ...@@ -1837,13 +1837,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
return 0; return 0;
} }
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { static FORCE_INLINE int32_t tEncodeSClientHbKey(SEncoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1; if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1; if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1; if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1; if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
return 0; return 0;
...@@ -2046,10 +2046,10 @@ typedef struct { ...@@ -2046,10 +2046,10 @@ typedef struct {
int32_t reserved; int32_t reserved;
} SMqCMCommitOffsetRsp; } SMqCMCommitOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
...@@ -2089,7 +2089,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { ...@@ -2089,7 +2089,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
return (void*)buf; return (void*)buf;
} }
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1; if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
...@@ -2098,7 +2098,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch ...@@ -2098,7 +2098,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1; if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
...@@ -2131,7 +2131,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp ...@@ -2131,7 +2131,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
return (void*)buf; return (void*)buf;
} }
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; i++) {
...@@ -2141,7 +2141,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem ...@@ -2141,7 +2141,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
...@@ -2610,8 +2610,8 @@ typedef struct { ...@@ -2610,8 +2610,8 @@ typedef struct {
}; };
} SVSubmitReq; } SVSubmitReq;
int32_t tEncodeSVSubmitReq(SCoder* pCoder, const SVSubmitReq* pReq); int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SCoder* pCoder, SVSubmitReq* pReq); int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
#pragma pack(pop) #pragma pack(pop)
......
...@@ -154,8 +154,8 @@ typedef struct { ...@@ -154,8 +154,8 @@ typedef struct {
} SStreamTask; } SStreamTask;
SStreamTask* tNewSStreamTask(int64_t streamId); SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
typedef struct { typedef struct {
......
此差异已折叠。
...@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.offsets = (SMqOffset*)offsets->container.pData; req.offsets = (SMqOffset*)offsets->container.pData;
} }
SCoder encoder; SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req); tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
tCoderClear(&encoder); tEncoderClear(&encoder);
return -1; return -1;
} }
tCoderClear(&encoder); tEncoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req); tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder); tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET); pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) { if (pRequest == NULL) {
......
此差异已折叠。
...@@ -594,8 +594,8 @@ typedef struct { ...@@ -594,8 +594,8 @@ typedef struct {
SSchemaWrapper outputSchema; SSchemaWrapper outputSchema;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -411,7 +411,7 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { ...@@ -411,7 +411,7 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return (void *)buf; return (void *)buf;
} }
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0; int32_t sz = 0;
/*int32_t outputNameSz = 0;*/ /*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
...@@ -462,7 +462,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -462,7 +462,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
......
...@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { ...@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode; SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pMsg->rpcMsg.pCont;
SMqCMCommitOffsetReq commitOffsetReq; SMqCMCommitOffsetReq commitOffsetReq;
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER); tDecoderInit(&decoder, msgStr, pMsg->rpcMsg.contLen);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
......
...@@ -83,12 +83,12 @@ END: ...@@ -83,12 +83,12 @@ END:
} }
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder; SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
int32_t size = encoder.pos; int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size; int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder); tEncoderClear(&encoder);
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet ...@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
} }
((SMsgHead*)buf)->vgId = htonl(nodeId); ((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER); tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder); tEncoderClear(&encoder);
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet)); memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
......
...@@ -369,7 +369,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch ...@@ -369,7 +369,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch
} }
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SCoder coder = {0}; SEncoder encoder = {0};
int32_t contLen; int32_t contLen;
SName name = {0}; SName name = {0};
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
...@@ -422,11 +422,11 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -422,11 +422,11 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER); tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateStbReq(&coder, &req) < 0) { if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
return NULL; return NULL;
} }
tCoderClear(&coder); tEncoderClear(&encoder);
*pContLen = contLen; *pContLen = contLen;
taosMemoryFreeClear(req.pRSmaParam.qmsg1); taosMemoryFreeClear(req.pRSmaParam.qmsg1);
...@@ -440,7 +440,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, ...@@ -440,7 +440,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
int32_t contLen = 0; int32_t contLen = 0;
int32_t ret = 0; int32_t ret = 0;
SMsgHead *pHead = NULL; SMsgHead *pHead = NULL;
SCoder coder = {0}; SEncoder encoder = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -462,9 +462,9 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, ...@@ -462,9 +462,9 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER); tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
tEncodeSVDropStbReq(&coder, &req); tEncodeSVDropStbReq(&encoder, &req);
tCoderClear(&coder); tEncoderClear(&encoder);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
......
...@@ -70,14 +70,14 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { ...@@ -70,14 +70,14 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL; void *buf = NULL;
SCoder encoder; SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncoderInit(&encoder, NULL, 0);
if (tEncodeSStreamObj(&encoder, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
...@@ -86,12 +86,12 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { ...@@ -86,12 +86,12 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
buf = taosMemoryMalloc(tlen); buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER; if (buf == NULL) goto STREAM_ENCODE_OVER;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
tCoderClear(&encoder); tEncoderClear(&encoder);
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
...@@ -138,8 +138,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { ...@@ -138,8 +138,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
if (buf == NULL) goto STREAM_DECODE_OVER; if (buf == NULL) goto STREAM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER); tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream) < 0) { if (tDecodeSStreamObj(&decoder, pStream) < 0) {
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
......
...@@ -99,10 +99,10 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -99,10 +99,10 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
ASSERT(0); ASSERT(0);
return; return;
} }
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER); tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
tDecodeSStreamTask(&decoder, pTask); tDecodeSStreamTask(&decoder, pTask);
tCoderClear(&decoder); tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask); sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { } else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
......
...@@ -205,7 +205,7 @@ struct SMetaEntry { ...@@ -205,7 +205,7 @@ struct SMetaEntry {
struct SMetaReader { struct SMetaReader {
int32_t flags; int32_t flags;
SMeta *pMeta; SMeta *pMeta;
SCoder coder; SDecoder coder;
SMetaEntry me; SMetaEntry me;
void *pBuf; void *pBuf;
int szBuf; int szBuf;
......
...@@ -39,8 +39,8 @@ typedef struct SMSmaCursor SMSmaCursor; ...@@ -39,8 +39,8 @@ typedef struct SMSmaCursor SMSmaCursor;
// metaOpen ================== // metaOpen ==================
// metaEntry ================== // metaEntry ==================
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME); int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME); int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
// metaTable ================== // metaTable ==================
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "meta.h" #include "meta.h"
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) { int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pME->version) < 0) return -1; if (tEncodeI64(pCoder, pME->version) < 0) return -1;
...@@ -43,7 +43,7 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) { ...@@ -43,7 +43,7 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
return 0; return 0;
} }
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) { int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
uint32_t len; uint32_t len;
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
......
...@@ -22,7 +22,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { ...@@ -22,7 +22,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
} }
void metaReaderClear(SMetaReader *pReader) { void metaReaderClear(SMetaReader *pReader) {
tCoderClear(&pReader->coder); tDecoderClear(&pReader->coder);
tdbFree(pReader->pBuf); tdbFree(pReader->pBuf);
} }
...@@ -37,7 +37,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u ...@@ -37,7 +37,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u
} }
// decode the entry // decode the entry
tCoderInit(&pReader->coder, TD_LITTLE_ENDIAN, pReader->pBuf, pReader->szBuf, TD_DECODER); tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) { if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
goto _err; goto _err;
...@@ -147,7 +147,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -147,7 +147,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
SSchema *pSchema = NULL; SSchema *pSchema = NULL;
void *pBuf; void *pBuf;
SCoder coder = {0}; SDecoder coder = {0};
// fetch // fetch
skmDbKey.uid = uid; skmDbKey.uid = uid;
...@@ -163,11 +163,11 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -163,11 +163,11 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
pBuf = pVal; pBuf = pVal;
pSW = taosMemoryMalloc(sizeof(SSchemaWrapper)); pSW = taosMemoryMalloc(sizeof(SSchemaWrapper));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER); tDecoderInit(&coder, pVal, vLen);
tDecodeSSchemaWrapper(&coder, pSW); tDecodeSSchemaWrapper(&coder, pSW);
pSchema = taosMemoryMalloc(sizeof(SSchema) * pSW->nCols); pSchema = taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
memcpy(pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols); memcpy(pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
tCoderClear(&coder); tDecoderClear(&coder);
pSW->pSchema = pSchema; pSW->pSchema = pSchema;
......
...@@ -33,7 +33,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -33,7 +33,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
void *pBuf = NULL; void *pBuf = NULL;
int32_t szBuf = 0; int32_t szBuf = 0;
void *p = NULL; void *p = NULL;
SCoder coder = {0};
SMetaReader mr = {0}; SMetaReader mr = {0};
// validate req // validate req
...@@ -192,7 +191,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -192,7 +191,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
tb_uid_t uid; tb_uid_t uid;
int64_t tver; int64_t tver;
SMetaEntry me = {0}; SMetaEntry me = {0};
SCoder coder = {0}; SDecoder coder = {0};
int8_t type; int8_t type;
int64_t ctime; int64_t ctime;
tb_uid_t suid; tb_uid_t suid;
...@@ -253,7 +252,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -253,7 +252,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
// decode entry // decode entry
void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo) void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo)
memcpy(pDataCopy, pData, nData); memcpy(pDataCopy, pData, nData);
tCoderInit(&coder, TD_LITTLE_ENDIAN, pDataCopy, nData, TD_DECODER); tDecoderInit(&coder, pDataCopy, nData);
ret = metaDecodeEntry(&coder, &me); ret = metaDecodeEntry(&coder, &me);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
...@@ -272,7 +271,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -272,7 +271,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
} }
taosMemoryFree(pDataCopy); taosMemoryFree(pDataCopy);
tCoderClear(&coder); tDecoderClear(&coder);
tdbDbcClose(pTbDbc); tdbDbcClose(pTbDbc);
if (type == TSDB_CHILD_TABLE) { if (type == TSDB_CHILD_TABLE) {
...@@ -309,7 +308,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -309,7 +308,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
void *pVal = NULL; void *pVal = NULL;
int kLen = 0; int kLen = 0;
int vLen = 0; int vLen = 0;
SCoder coder = {0}; SEncoder coder = {0};
// set key and value // set key and value
tbDbKey.version = pME->version; tbDbKey.version = pME->version;
...@@ -330,13 +329,13 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -330,13 +329,13 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
goto _err; goto _err;
} }
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER); tEncoderInit(&coder, pVal, vLen);
if (metaEncodeEntry(&coder, pME) < 0) { if (metaEncodeEntry(&coder, pME) < 0) {
goto _err; goto _err;
} }
tCoderClear(&coder); tEncoderClear(&coder);
// write to table.db // write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
...@@ -393,7 +392,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -393,7 +392,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
} }
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SCoder coder = {0}; SEncoder coder = {0};
void *pVal = NULL; void *pVal = NULL;
int vLen = 0; int vLen = 0;
int rcode = 0; int rcode = 0;
...@@ -422,7 +421,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -422,7 +421,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
goto _exit; goto _exit;
} }
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER); tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW); tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
...@@ -432,7 +431,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -432,7 +431,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
_exit: _exit:
taosMemoryFree(pVal); taosMemoryFree(pVal);
tCoderClear(&coder); tEncoderClear(&coder);
return rcode; return rcode;
} }
......
...@@ -910,12 +910,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -910,12 +910,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) { if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0); ASSERT(0);
} }
tCoderClear(&decoder); tDecoderClear(&decoder);
// exec // exec
if (tqExpandTask(pTq, pTask, 4) < 0) { if (tqExpandTask(pTq, pTask, 4) < 0) {
......
...@@ -180,7 +180,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -180,7 +180,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
const uint8_t *pt; const uint8_t *pt;
const STSRow *pRow; const STSRow *pRow;
uint64_t szRow; uint64_t szRow;
SCoder coder = {0}; SDecoder decoder = {0};
// tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER); // tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
for (;;) { for (;;) {
......
...@@ -286,7 +286,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -286,7 +286,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SCoder coder; SDecoder coder;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
...@@ -294,7 +294,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -294,7 +294,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
pRsp->contLen = 0; pRsp->contLen = 0;
// decode and process req // decode and process req
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); tDecoderInit(&coder, pReq, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
...@@ -308,16 +308,16 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -308,16 +308,16 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req); tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req);
tCoderClear(&coder); tDecoderClear(&coder);
return 0; return 0;
_err: _err:
tCoderClear(&coder); tDecoderClear(&coder);
return -1; return -1;
} }
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
SCoder coder = {0}; SDecoder decoder = {0};
int rcode = 0; int rcode = 0;
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq; SVCreateTbReq *pCreateReq;
...@@ -332,8 +332,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -332,8 +332,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
pRsp->contLen = 0; pRsp->contLen = 0;
// decode // decode
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); tDecoderInit(&decoder, pReq, len);
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
rcode = -1; rcode = -1;
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
...@@ -373,13 +373,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -373,13 +373,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
taosArrayPush(rsp.pArray, &cRsp); taosArrayPush(rsp.pArray, &cRsp);
} }
tCoderClear(&coder); tDecoderClear(&decoder);
tsdbUpdateTbUidList(pVnode->pTsdb, pStore); tsdbUpdateTbUidList(pVnode->pTsdb, pStore);
tsdbUidStoreFree(pStore); tsdbUidStoreFree(pStore);
// prepare rsp // prepare rsp
int32_t ret = 0; SEncoder encoder = {0};
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
if (pRsp->pCont == NULL) { if (pRsp->pCont == NULL) {
...@@ -387,12 +388,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -387,12 +388,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
rcode = -1; rcode = -1;
goto _exit; goto _exit;
} }
tCoderInit(&coder, TD_LITTLE_ENDIAN, pRsp->pCont, pRsp->contLen, TD_ENCODER); tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
tEncodeSVCreateTbBatchRsp(&coder, &rsp); tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
tEncoderClear(&encoder);
_exit: _exit:
taosArrayClear(rsp.pArray); taosArrayClear(rsp.pArray);
tCoderClear(&coder); tDecoderClear(&decoder);
tEncoderClear(&encoder);
return rcode; return rcode;
} }
...@@ -416,15 +419,15 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpc ...@@ -416,15 +419,15 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpc
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropStbReq req = {0}; SVDropStbReq req = {0};
int rcode = TSDB_CODE_SUCCESS; int rcode = TSDB_CODE_SUCCESS;
SCoder coder = {0}; SDecoder decoder = {0};
pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
pRsp->contLen = 0; pRsp->contLen = 0;
// decode request // decode request
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); tDecoderInit(&decoder, pReq, len);
if (tDecodeSVDropStbReq(&coder, &req) < 0) { if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
rcode = TSDB_CODE_INVALID_MSG; rcode = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
} }
...@@ -438,7 +441,7 @@ static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, i ...@@ -438,7 +441,7 @@ static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, i
// return rsp // return rsp
_exit: _exit:
pRsp->code = rcode; pRsp->code = rcode;
tCoderClear(&coder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -451,7 +454,7 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM ...@@ -451,7 +454,7 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
SVDropTbBatchRsp rsp = {0}; SVDropTbBatchRsp rsp = {0};
SCoder coder = {0}; SDecoder decoder = {0};
int ret; int ret;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
...@@ -460,8 +463,8 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -460,8 +463,8 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
// decode req // decode req
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); tDecoderInit(&decoder, pReq, len);
ret = tDecodeSVDropTbBatchReq(&coder, &req); ret = tDecodeSVDropTbBatchReq(&decoder, &req);
if (ret < 0) { if (ret < 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
pRsp->code = terrno; pRsp->code = terrno;
...@@ -490,7 +493,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -490,7 +493,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
_exit: _exit:
tCoderClear(&coder); tDecoderClear(&decoder);
// encode rsp (TODO) // encode rsp (TODO)
return 0; return 0;
} }
...@@ -501,7 +504,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -501,7 +504,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
SSubmitBlk *pBlock; SSubmitBlk *pBlock;
SSubmitRsp rsp = {0}; SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SCoder coder = {0}; SDecoder decoder = {0};
int32_t nRows; int32_t nRows;
pRsp->code = 0; pRsp->code = 0;
...@@ -518,17 +521,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -518,17 +521,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
// create table for auto create table mode // create table for auto create table mode
if (msgIter.schemaLen > 0) { if (msgIter.schemaLen > 0) {
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBlock->data, msgIter.schemaLen, TD_DECODER); tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
if (tDecodeSVCreateTbReq(&coder, &createTbReq) < 0) { if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG; pRsp->code = TSDB_CODE_INVALID_MSG;
tCoderClear(&coder); tDecoderClear(&decoder);
goto _exit; goto _exit;
} }
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
pRsp->code = terrno; pRsp->code = terrno;
tCoderClear(&coder); tDecoderClear(&decoder);
goto _exit; goto _exit;
} }
} }
...@@ -540,7 +543,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -540,7 +543,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
msgIter.suid = 0; msgIter.suid = 0;
} }
tCoderClear(&coder); tDecoderClear(&decoder);
} }
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) { if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tcoding.h" #include "tcoding.h"
#include "tencode.h" #include "tencode.h"
static int32_t tEncodeSMonSysInfo(SCoder *encoder, const SMonSysInfo *pInfo) { static int32_t tEncodeSMonSysInfo(SEncoder *encoder, const SMonSysInfo *pInfo) {
if (tEncodeDouble(encoder, pInfo->cpu_engine) < 0) return -1; if (tEncodeDouble(encoder, pInfo->cpu_engine) < 0) return -1;
if (tEncodeDouble(encoder, pInfo->cpu_system) < 0) return -1; if (tEncodeDouble(encoder, pInfo->cpu_system) < 0) return -1;
if (tEncodeFloat(encoder, pInfo->cpu_cores) < 0) return -1; if (tEncodeFloat(encoder, pInfo->cpu_cores) < 0) return -1;
...@@ -37,7 +37,7 @@ static int32_t tEncodeSMonSysInfo(SCoder *encoder, const SMonSysInfo *pInfo) { ...@@ -37,7 +37,7 @@ static int32_t tEncodeSMonSysInfo(SCoder *encoder, const SMonSysInfo *pInfo) {
return 0; return 0;
} }
static int32_t tDecodeSMonSysInfo(SCoder *decoder, SMonSysInfo *pInfo) { static int32_t tDecodeSMonSysInfo(SDecoder *decoder, SMonSysInfo *pInfo) {
if (tDecodeDouble(decoder, &pInfo->cpu_engine) < 0) return -1; if (tDecodeDouble(decoder, &pInfo->cpu_engine) < 0) return -1;
if (tDecodeDouble(decoder, &pInfo->cpu_system) < 0) return -1; if (tDecodeDouble(decoder, &pInfo->cpu_system) < 0) return -1;
if (tDecodeFloat(decoder, &pInfo->cpu_cores) < 0) return -1; if (tDecodeFloat(decoder, &pInfo->cpu_cores) < 0) return -1;
...@@ -56,7 +56,7 @@ static int32_t tDecodeSMonSysInfo(SCoder *decoder, SMonSysInfo *pInfo) { ...@@ -56,7 +56,7 @@ static int32_t tDecodeSMonSysInfo(SCoder *decoder, SMonSysInfo *pInfo) {
return 0; return 0;
} }
int32_t tEncodeSMonLogs(SCoder *encoder, const SMonLogs *pInfo) { int32_t tEncodeSMonLogs(SEncoder *encoder, const SMonLogs *pInfo) {
if (tEncodeI32(encoder, pInfo->numOfErrorLogs) < 0) return -1; if (tEncodeI32(encoder, pInfo->numOfErrorLogs) < 0) return -1;
if (tEncodeI32(encoder, pInfo->numOfInfoLogs) < 0) return -1; if (tEncodeI32(encoder, pInfo->numOfInfoLogs) < 0) return -1;
if (tEncodeI32(encoder, pInfo->numOfDebugLogs) < 0) return -1; if (tEncodeI32(encoder, pInfo->numOfDebugLogs) < 0) return -1;
...@@ -71,7 +71,7 @@ int32_t tEncodeSMonLogs(SCoder *encoder, const SMonLogs *pInfo) { ...@@ -71,7 +71,7 @@ int32_t tEncodeSMonLogs(SCoder *encoder, const SMonLogs *pInfo) {
return 0; return 0;
} }
static int32_t tDecodeSMonLogs(SCoder *decoder, SMonLogs *pInfo) { static int32_t tDecodeSMonLogs(SDecoder *decoder, SMonLogs *pInfo) {
if (tDecodeI32(decoder, &pInfo->numOfErrorLogs) < 0) return -1; if (tDecodeI32(decoder, &pInfo->numOfErrorLogs) < 0) return -1;
if (tDecodeI32(decoder, &pInfo->numOfInfoLogs) < 0) return -1; if (tDecodeI32(decoder, &pInfo->numOfInfoLogs) < 0) return -1;
if (tDecodeI32(decoder, &pInfo->numOfDebugLogs) < 0) return -1; if (tDecodeI32(decoder, &pInfo->numOfDebugLogs) < 0) return -1;
...@@ -96,7 +96,7 @@ static int32_t tDecodeSMonLogs(SCoder *decoder, SMonLogs *pInfo) { ...@@ -96,7 +96,7 @@ static int32_t tDecodeSMonLogs(SCoder *decoder, SMonLogs *pInfo) {
return 0; return 0;
} }
int32_t tEncodeSMonClusterInfo(SCoder *encoder, const SMonClusterInfo *pInfo) { int32_t tEncodeSMonClusterInfo(SEncoder *encoder, const SMonClusterInfo *pInfo) {
if (tEncodeCStr(encoder, pInfo->first_ep) < 0) return -1; if (tEncodeCStr(encoder, pInfo->first_ep) < 0) return -1;
if (tEncodeI32(encoder, pInfo->first_ep_dnode_id) < 0) return -1; if (tEncodeI32(encoder, pInfo->first_ep_dnode_id) < 0) return -1;
if (tEncodeCStr(encoder, pInfo->version) < 0) return -1; if (tEncodeCStr(encoder, pInfo->version) < 0) return -1;
...@@ -124,7 +124,7 @@ int32_t tEncodeSMonClusterInfo(SCoder *encoder, const SMonClusterInfo *pInfo) { ...@@ -124,7 +124,7 @@ int32_t tEncodeSMonClusterInfo(SCoder *encoder, const SMonClusterInfo *pInfo) {
return 0; return 0;
} }
int32_t tDecodeSMonClusterInfo(SCoder *decoder, SMonClusterInfo *pInfo) { int32_t tDecodeSMonClusterInfo(SDecoder *decoder, SMonClusterInfo *pInfo) {
if (tDecodeCStrTo(decoder, pInfo->first_ep) < 0) return -1; if (tDecodeCStrTo(decoder, pInfo->first_ep) < 0) return -1;
if (tDecodeI32(decoder, &pInfo->first_ep_dnode_id) < 0) return -1; if (tDecodeI32(decoder, &pInfo->first_ep_dnode_id) < 0) return -1;
if (tDecodeCStrTo(decoder, pInfo->version) < 0) return -1; if (tDecodeCStrTo(decoder, pInfo->version) < 0) return -1;
...@@ -163,7 +163,7 @@ int32_t tDecodeSMonClusterInfo(SCoder *decoder, SMonClusterInfo *pInfo) { ...@@ -163,7 +163,7 @@ int32_t tDecodeSMonClusterInfo(SCoder *decoder, SMonClusterInfo *pInfo) {
return 0; return 0;
} }
int32_t tEncodeSMonVgroupInfo(SCoder *encoder, const SMonVgroupInfo *pInfo) { int32_t tEncodeSMonVgroupInfo(SEncoder *encoder, const SMonVgroupInfo *pInfo) {
if (tEncodeI32(encoder, taosArrayGetSize(pInfo->vgroups)) < 0) return -1; if (tEncodeI32(encoder, taosArrayGetSize(pInfo->vgroups)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->vgroups); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->vgroups); ++i) {
SMonVgroupDesc *pDesc = taosArrayGet(pInfo->vgroups, i); SMonVgroupDesc *pDesc = taosArrayGet(pInfo->vgroups, i);
...@@ -180,7 +180,7 @@ int32_t tEncodeSMonVgroupInfo(SCoder *encoder, const SMonVgroupInfo *pInfo) { ...@@ -180,7 +180,7 @@ int32_t tEncodeSMonVgroupInfo(SCoder *encoder, const SMonVgroupInfo *pInfo) {
return 0; return 0;
} }
int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) { int32_t tDecodeSMonVgroupInfo(SDecoder *decoder, SMonVgroupInfo *pInfo) {
int32_t arraySize = 0; int32_t arraySize = 0;
if (tDecodeI32(decoder, &arraySize) < 0) return -1; if (tDecodeI32(decoder, &arraySize) < 0) return -1;
...@@ -203,14 +203,14 @@ int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) { ...@@ -203,14 +203,14 @@ int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) {
return 0; return 0;
} }
int32_t tEncodeSMonGrantInfo(SCoder *encoder, const SMonGrantInfo *pInfo) { int32_t tEncodeSMonGrantInfo(SEncoder *encoder, const SMonGrantInfo *pInfo) {
if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1; if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1;
if (tEncodeI64(encoder, pInfo->timeseries_used) < 0) return -1; if (tEncodeI64(encoder, pInfo->timeseries_used) < 0) return -1;
if (tEncodeI64(encoder, pInfo->timeseries_total) < 0) return -1; if (tEncodeI64(encoder, pInfo->timeseries_total) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) { int32_t tDecodeSMonGrantInfo(SDecoder *decoder, SMonGrantInfo *pInfo) {
if (tDecodeI32(decoder, &pInfo->expire_time) < 0) return -1; if (tDecodeI32(decoder, &pInfo->expire_time) < 0) return -1;
if (tDecodeI64(decoder, &pInfo->timeseries_used) < 0) return -1; if (tDecodeI64(decoder, &pInfo->timeseries_used) < 0) return -1;
if (tDecodeI64(decoder, &pInfo->timeseries_total) < 0) return -1; if (tDecodeI64(decoder, &pInfo->timeseries_total) < 0) return -1;
...@@ -218,8 +218,8 @@ int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) { ...@@ -218,8 +218,8 @@ int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) {
} }
int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSMonClusterInfo(&encoder, &pInfo->cluster) < 0) return -1; if (tEncodeSMonClusterInfo(&encoder, &pInfo->cluster) < 0) return -1;
...@@ -230,13 +230,13 @@ int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { ...@@ -230,13 +230,13 @@ int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) {
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSMonClusterInfo(&decoder, &pInfo->cluster) < 0) return -1; if (tDecodeSMonClusterInfo(&decoder, &pInfo->cluster) < 0) return -1;
...@@ -246,7 +246,7 @@ int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { ...@@ -246,7 +246,7 @@ int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) {
if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -261,7 +261,7 @@ void tFreeSMonMmInfo(SMonMmInfo *pInfo) { ...@@ -261,7 +261,7 @@ void tFreeSMonMmInfo(SMonMmInfo *pInfo) {
pInfo->log.logs = NULL; pInfo->log.logs = NULL;
} }
int32_t tEncodeSMonDiskDesc(SCoder *encoder, const SMonDiskDesc *pDesc) { int32_t tEncodeSMonDiskDesc(SEncoder *encoder, const SMonDiskDesc *pDesc) {
if (tEncodeCStr(encoder, pDesc->name) < 0) return -1; if (tEncodeCStr(encoder, pDesc->name) < 0) return -1;
if (tEncodeI8(encoder, pDesc->level) < 0) return -1; if (tEncodeI8(encoder, pDesc->level) < 0) return -1;
if (tEncodeI64(encoder, pDesc->size.total) < 0) return -1; if (tEncodeI64(encoder, pDesc->size.total) < 0) return -1;
...@@ -270,7 +270,7 @@ int32_t tEncodeSMonDiskDesc(SCoder *encoder, const SMonDiskDesc *pDesc) { ...@@ -270,7 +270,7 @@ int32_t tEncodeSMonDiskDesc(SCoder *encoder, const SMonDiskDesc *pDesc) {
return 0; return 0;
} }
static int32_t tDecodeSMonDiskDesc(SCoder *decoder, SMonDiskDesc *pDesc) { static int32_t tDecodeSMonDiskDesc(SDecoder *decoder, SMonDiskDesc *pDesc) {
if (tDecodeCStrTo(decoder, pDesc->name) < 0) return -1; if (tDecodeCStrTo(decoder, pDesc->name) < 0) return -1;
if (tDecodeI8(decoder, &pDesc->level) < 0) return -1; if (tDecodeI8(decoder, &pDesc->level) < 0) return -1;
if (tDecodeI64(decoder, &pDesc->size.total) < 0) return -1; if (tDecodeI64(decoder, &pDesc->size.total) < 0) return -1;
...@@ -279,7 +279,7 @@ static int32_t tDecodeSMonDiskDesc(SCoder *decoder, SMonDiskDesc *pDesc) { ...@@ -279,7 +279,7 @@ static int32_t tDecodeSMonDiskDesc(SCoder *decoder, SMonDiskDesc *pDesc) {
return 0; return 0;
} }
int32_t tEncodeSMonDiskInfo(SCoder *encoder, const SMonDiskInfo *pInfo) { int32_t tEncodeSMonDiskInfo(SEncoder *encoder, const SMonDiskInfo *pInfo) {
if (tEncodeI32(encoder, taosArrayGetSize(pInfo->datadirs)) < 0) return -1; if (tEncodeI32(encoder, taosArrayGetSize(pInfo->datadirs)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->datadirs); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->datadirs); ++i) {
SMonDiskDesc *pDesc = taosArrayGet(pInfo->datadirs, i); SMonDiskDesc *pDesc = taosArrayGet(pInfo->datadirs, i);
...@@ -288,7 +288,7 @@ int32_t tEncodeSMonDiskInfo(SCoder *encoder, const SMonDiskInfo *pInfo) { ...@@ -288,7 +288,7 @@ int32_t tEncodeSMonDiskInfo(SCoder *encoder, const SMonDiskInfo *pInfo) {
return 0; return 0;
} }
static int32_t tDecodeSMonDiskInfo(SCoder *decoder, SMonDiskInfo *pInfo) { static int32_t tDecodeSMonDiskInfo(SDecoder *decoder, SMonDiskInfo *pInfo) {
int32_t arraySize = 0; int32_t arraySize = 0;
if (tDecodeI32(decoder, &arraySize) < 0) return -1; if (tDecodeI32(decoder, &arraySize) < 0) return -1;
...@@ -304,7 +304,7 @@ static int32_t tDecodeSMonDiskInfo(SCoder *decoder, SMonDiskInfo *pInfo) { ...@@ -304,7 +304,7 @@ static int32_t tDecodeSMonDiskInfo(SCoder *decoder, SMonDiskInfo *pInfo) {
return 0; return 0;
} }
int32_t tEncodeSVnodesStat(SCoder *encoder, const SVnodesStat *pStat) { int32_t tEncodeSVnodesStat(SEncoder *encoder, const SVnodesStat *pStat) {
if (tEncodeI32(encoder, pStat->openVnodes) < 0) return -1; if (tEncodeI32(encoder, pStat->openVnodes) < 0) return -1;
if (tEncodeI32(encoder, pStat->totalVnodes) < 0) return -1; if (tEncodeI32(encoder, pStat->totalVnodes) < 0) return -1;
if (tEncodeI32(encoder, pStat->masterNum) < 0) return -1; if (tEncodeI32(encoder, pStat->masterNum) < 0) return -1;
...@@ -317,7 +317,7 @@ int32_t tEncodeSVnodesStat(SCoder *encoder, const SVnodesStat *pStat) { ...@@ -317,7 +317,7 @@ int32_t tEncodeSVnodesStat(SCoder *encoder, const SVnodesStat *pStat) {
return 0; return 0;
} }
static int32_t tDecodeSVnodesStat(SCoder *decoder, SVnodesStat *pStat) { static int32_t tDecodeSVnodesStat(SDecoder *decoder, SVnodesStat *pStat) {
if (tDecodeI32(decoder, &pStat->openVnodes) < 0) return -1; if (tDecodeI32(decoder, &pStat->openVnodes) < 0) return -1;
if (tDecodeI32(decoder, &pStat->totalVnodes) < 0) return -1; if (tDecodeI32(decoder, &pStat->totalVnodes) < 0) return -1;
if (tDecodeI32(decoder, &pStat->masterNum) < 0) return -1; if (tDecodeI32(decoder, &pStat->masterNum) < 0) return -1;
...@@ -331,8 +331,8 @@ static int32_t tDecodeSVnodesStat(SCoder *decoder, SVnodesStat *pStat) { ...@@ -331,8 +331,8 @@ static int32_t tDecodeSVnodesStat(SCoder *decoder, SVnodesStat *pStat) {
} }
int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) { int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSMonDiskInfo(&encoder, &pInfo->tfs) < 0) return -1; if (tEncodeSMonDiskInfo(&encoder, &pInfo->tfs) < 0) return -1;
...@@ -342,13 +342,13 @@ int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) { ...@@ -342,13 +342,13 @@ int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) {
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) { int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSMonDiskInfo(&decoder, &pInfo->tfs) < 0) return -1; if (tDecodeSMonDiskInfo(&decoder, &pInfo->tfs) < 0) return -1;
...@@ -357,7 +357,7 @@ int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) { ...@@ -357,7 +357,7 @@ int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) {
if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -369,8 +369,8 @@ void tFreeSMonVmInfo(SMonVmInfo *pInfo) { ...@@ -369,8 +369,8 @@ void tFreeSMonVmInfo(SMonVmInfo *pInfo) {
} }
int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) { int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1;
...@@ -378,20 +378,20 @@ int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) { ...@@ -378,20 +378,20 @@ int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) {
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) { int32_t tDeserializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1;
if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -401,8 +401,8 @@ void tFreeSMonQmInfo(SMonQmInfo *pInfo) { ...@@ -401,8 +401,8 @@ void tFreeSMonQmInfo(SMonQmInfo *pInfo) {
} }
int32_t tSerializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) { int32_t tSerializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1;
...@@ -410,20 +410,20 @@ int32_t tSerializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) { ...@@ -410,20 +410,20 @@ int32_t tSerializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) {
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) { int32_t tDeserializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1;
if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -433,8 +433,8 @@ void tFreeSMonSmInfo(SMonSmInfo *pInfo) { ...@@ -433,8 +433,8 @@ void tFreeSMonSmInfo(SMonSmInfo *pInfo) {
} }
int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) { int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1;
...@@ -442,20 +442,20 @@ int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) { ...@@ -442,20 +442,20 @@ int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) {
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) { int32_t tDeserializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1;
if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -465,8 +465,8 @@ void tFreeSMonBmInfo(SMonBmInfo *pInfo) { ...@@ -465,8 +465,8 @@ void tFreeSMonBmInfo(SMonBmInfo *pInfo) {
} }
int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) { int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pInfo->pVloads)) < 0) return -1; if (tEncodeI32(&encoder, taosArrayGetSize(pInfo->pVloads)) < 0) return -1;
...@@ -488,13 +488,13 @@ int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) ...@@ -488,13 +488,13 @@ int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo)
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) { int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
...@@ -522,7 +522,7 @@ int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInf ...@@ -522,7 +522,7 @@ int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInf
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -532,8 +532,8 @@ void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) { ...@@ -532,8 +532,8 @@ void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) {
} }
int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) { int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI8(&encoder, pInfo->isMnode) < 0) return -1; if (tEncodeI8(&encoder, pInfo->isMnode) < 0) return -1;
...@@ -541,19 +541,19 @@ int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) ...@@ -541,19 +541,19 @@ int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo)
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) { int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI8(&decoder, &pInfo->isMnode) < 0) return -1; if (tDecodeI8(&decoder, &pInfo->isMnode) < 0) return -1;
if (tDecodeI32(&decoder, &pInfo->load.syncState) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->load.syncState) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
\ No newline at end of file
...@@ -156,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star ...@@ -156,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
} }
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
SCoder coder = {0}; SEncoder coder = {0};
char* pBuf; char* pBuf;
int32_t len; int32_t len;
...@@ -176,9 +176,9 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) ...@@ -176,9 +176,9 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
pBuf= pBlocks->pData + pBlocks->size; pBuf= pBlocks->pData + pBlocks->size;
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, len, TD_ENCODER); tEncoderInit(&coder, pBuf, len);
tEncodeSVCreateTbReq(&coder, pCreateTbReq); tEncodeSVCreateTbReq(&coder, pCreateTbReq);
tCoderClear(&coder); tEncoderClear(&coder);
pBlocks->size += len; pBlocks->size += len;
pBlocks->createTbReqLen = len; pBlocks->createTbReqLen = len;
......
...@@ -3557,8 +3557,8 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* ...@@ -3557,8 +3557,8 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
} }
static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch, SArray* pBufArray) { static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch, SArray* pBufArray) {
int tlen; int tlen;
SCoder coder = {0}; SEncoder coder = {0};
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret); tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret);
...@@ -3571,9 +3571,9 @@ static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch ...@@ -3571,9 +3571,9 @@ static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch
((SMsgHead*)buf)->contLen = htonl(tlen); ((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER); tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
tEncodeSVCreateTbBatchReq(&coder, &pTbBatch->req); tEncodeSVCreateTbBatchReq(&coder, &pTbBatch->req);
tCoderClear(&coder); tEncoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) { if (NULL == pVgData) {
...@@ -3957,8 +3957,8 @@ over: ...@@ -3957,8 +3957,8 @@ over:
static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); } static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); }
static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) { static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) {
int tlen; int tlen;
SCoder coder = {0}; SEncoder coder = {0};
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &pTbBatch->req, tlen, ret); tEncodeSize(tEncodeSVDropTbBatchReq, &pTbBatch->req, tlen, ret);
...@@ -3971,9 +3971,9 @@ static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SA ...@@ -3971,9 +3971,9 @@ static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SA
((SMsgHead*)buf)->contLen = htonl(tlen); ((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER); tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
tEncodeSVDropTbBatchReq(&coder, &pTbBatch->req); tEncodeSVDropTbBatchReq(&coder, &pTbBatch->req);
tCoderClear(&coder); tEncoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) { if (NULL == pVgData) {
......
...@@ -1085,21 +1085,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1085,21 +1085,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_CREATE_TABLE_RSP: { case TDMT_VND_CREATE_TABLE_RSP: {
SVCreateTbBatchRsp batchRsp = {0}; SVCreateTbBatchRsp batchRsp = {0};
if (msg) { if (msg) {
SCoder coder = {0}; SDecoder coder = {0};
tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER); tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp); code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i; SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tCoderClear(&coder); tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) { } else if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code; code = rsp->code;
} }
} }
} }
tCoderClear(&coder); tDecoderClear(&coder);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
...@@ -1110,21 +1110,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1110,21 +1110,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_DROP_TABLE_RSP: { case TDMT_VND_DROP_TABLE_RSP: {
SVDropTbBatchRsp batchRsp = {0}; SVDropTbBatchRsp batchRsp = {0};
if (msg) { if (msg) {
SCoder coder = {0}; SDecoder coder = {0};
tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER); tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp); code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVDropTbRsp *rsp = batchRsp.pRsps + i; SVDropTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tCoderClear(&coder); tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) { } else if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code; code = rsp->code;
} }
} }
} }
tCoderClear(&coder); tDecoderClear(&coder);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
......
...@@ -256,7 +256,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { ...@@ -256,7 +256,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
return pTask; return pTask;
} }
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/ /*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
...@@ -301,7 +301,7 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { ...@@ -301,7 +301,7 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/ /*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
......
...@@ -331,8 +331,8 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { ...@@ -331,8 +331,8 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
} }
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) { int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) { if (tStartEncode(&encoder) < 0) {
return -1; return -1;
} }
...@@ -367,13 +367,13 @@ int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) { ...@@ -367,13 +367,13 @@ int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) { if (tStartDecode(&decoder) < 0) {
return NULL; return NULL;
} }
...@@ -418,7 +418,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { ...@@ -418,7 +418,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
memcpy(pMsg->data, data, len); memcpy(pMsg->data, data, len);
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return pMsg; return pMsg;
} }
...@@ -590,8 +590,8 @@ SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) { ...@@ -590,8 +590,8 @@ SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) {
} }
int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen) { int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen) {
SCoder encoder = {0}; SEncoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) { if (tStartEncode(&encoder) < 0) {
return -1; return -1;
} }
...@@ -626,13 +626,13 @@ int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bu ...@@ -626,13 +626,13 @@ int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bu
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
SCoder decoder = {0}; SDecoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) { if (tStartDecode(&decoder) < 0) {
return NULL; return NULL;
} }
...@@ -677,7 +677,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { ...@@ -677,7 +677,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
memcpy(pMsg->data, data, len); memcpy(pMsg->data, data, len);
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tDecoderClear(&decoder);
return pMsg; return pMsg;
} }
......
...@@ -21,48 +21,60 @@ static_assert(sizeof(float) == sizeof(uint32_t), "sizeof(float) must equal to si ...@@ -21,48 +21,60 @@ static_assert(sizeof(float) == sizeof(uint32_t), "sizeof(float) must equal to si
static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) must equal to sizeof(uint64_t)"); static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) must equal to sizeof(uint64_t)");
#endif #endif
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type) { struct SEncoderNode {
if (type == TD_ENCODER) { SEncoderNode* pNext;
if (data == NULL) size = 0; uint8_t* data;
} else { uint32_t size;
ASSERT(data && size > 0); uint32_t pos;
} };
pCoder->type = type; struct SDecoderNode {
pCoder->endian = endian; SDecoderNode* pNext;
pCoder->data = data; const uint8_t* data;
pCoder->size = size; uint32_t size;
pCoder->pos = 0; uint32_t pos;
pCoder->mList = NULL; };
TD_SLIST_INIT(&(pCoder->stack));
void tEncoderInit(SEncoder* pEncoder, uint8_t* data, uint32_t size) {
if (data == NULL) size = 0;
pEncoder->data = data;
pEncoder->size = size;
pEncoder->pos = 0;
pEncoder->mList = NULL;
pEncoder->eStack = NULL;
} }
void tCoderClear(SCoder* pCoder) { void tEncoderClear(SEncoder* pCoder) {
SCoderMem* pMem; for (SCoderMem* pMem = pCoder->mList; pMem; pMem = pCoder->mList) {
// clear memory
for (pMem = pCoder->mList; pMem; pMem = pCoder->mList) {
pCoder->mList = pMem->next; pCoder->mList = pMem->next;
taosMemoryFree(pMem); taosMemoryFree(pMem);
} }
memset(pCoder, 0, sizeof(*pCoder));
}
void tDecoderInit(SDecoder* pDecoder, const uint8_t* data, uint32_t size) {
pDecoder->data = data;
pDecoder->size = size;
pDecoder->pos = 0;
pDecoder->mList = NULL;
pDecoder->dStack = NULL;
}
struct SCoderNode* pNode; void tDecoderClear(SDecoder* pCoder) {
for (;;) { for (SCoderMem* pMem = pCoder->mList; pMem; pMem = pCoder->mList) {
pNode = TD_SLIST_HEAD(&(pCoder->stack)); pCoder->mList = pMem->next;
if (pNode == NULL) break; taosMemoryFree(pMem);
TD_SLIST_POP(&(pCoder->stack));
taosMemoryFree(pNode);
} }
memset(pCoder, 0, sizeof(*pCoder));
} }
int32_t tStartEncode(SCoder* pCoder) { int32_t tStartEncode(SEncoder* pCoder) {
struct SCoderNode* pNode; SEncoderNode* pNode;
ASSERT(pCoder->type == TD_ENCODER);
if (pCoder->data) { if (pCoder->data) {
if (pCoder->size - pCoder->pos < sizeof(int32_t)) return -1; if (pCoder->size - pCoder->pos < sizeof(int32_t)) return -1;
pNode = taosMemoryMalloc(sizeof(*pNode)); pNode = tEncoderMalloc(pCoder, sizeof(*pNode));
if (pNode == NULL) return -1; if (pNode == NULL) return -1;
pNode->data = pCoder->data; pNode->data = pCoder->data;
...@@ -73,22 +85,23 @@ int32_t tStartEncode(SCoder* pCoder) { ...@@ -73,22 +85,23 @@ int32_t tStartEncode(SCoder* pCoder) {
pCoder->pos = 0; pCoder->pos = 0;
pCoder->size = pNode->size - pNode->pos - sizeof(int32_t); pCoder->size = pNode->size - pNode->pos - sizeof(int32_t);
TD_SLIST_PUSH(&(pCoder->stack), pNode); pNode->pNext = pCoder->eStack;
pCoder->eStack = pNode;
} else { } else {
pCoder->pos += sizeof(int32_t); pCoder->pos += sizeof(int32_t);
} }
return 0; return 0;
} }
void tEndEncode(SCoder* pCoder) { void tEndEncode(SEncoder* pCoder) {
struct SCoderNode* pNode; SEncoderNode* pNode;
int32_t len; int32_t len;
ASSERT(pCoder->type == TD_ENCODER);
if (pCoder->data) { if (pCoder->data) {
pNode = TD_SLIST_HEAD(&(pCoder->stack)); pNode = pCoder->eStack;
ASSERT(pNode); ASSERT(pNode);
TD_SLIST_POP(&(pCoder->stack)); pCoder->eStack = pNode->pNext;
len = pCoder->pos; len = pCoder->pos;
...@@ -99,19 +112,16 @@ void tEndEncode(SCoder* pCoder) { ...@@ -99,19 +112,16 @@ void tEndEncode(SCoder* pCoder) {
tEncodeI32(pCoder, len); tEncodeI32(pCoder, len);
TD_CODER_MOVE_POS(pCoder, len); TD_CODER_MOVE_POS(pCoder, len);
taosMemoryFree(pNode);
} }
} }
int32_t tStartDecode(SCoder* pCoder) { int32_t tStartDecode(SDecoder* pCoder) {
int32_t len; SDecoderNode* pNode;
struct SCoderNode* pNode; int32_t len;
ASSERT(pCoder->type == TD_DECODER);
if (tDecodeI32(pCoder, &len) < 0) return -1; if (tDecodeI32(pCoder, &len) < 0) return -1;
pNode = taosMemoryMalloc(sizeof(*pNode)); pNode = tDecoderMalloc(pCoder, sizeof(*pNode));
if (pNode == NULL) return -1; if (pNode == NULL) return -1;
pNode->data = pCoder->data; pNode->data = pCoder->data;
...@@ -122,23 +132,20 @@ int32_t tStartDecode(SCoder* pCoder) { ...@@ -122,23 +132,20 @@ int32_t tStartDecode(SCoder* pCoder) {
pCoder->size = len; pCoder->size = len;
pCoder->pos = 0; pCoder->pos = 0;
TD_SLIST_PUSH(&(pCoder->stack), pNode); pNode->pNext = pCoder->dStack;
pCoder->dStack = pNode;
return 0; return 0;
} }
void tEndDecode(SCoder* pCoder) { void tEndDecode(SDecoder* pCoder) {
struct SCoderNode* pNode; SDecoderNode* pNode;
ASSERT(pCoder->type == TD_DECODER);
pNode = TD_SLIST_HEAD(&(pCoder->stack)); pNode = pCoder->dStack;
ASSERT(pNode); ASSERT(pNode);
TD_SLIST_POP(&(pCoder->stack)); pCoder->dStack = pNode->pNext;
pCoder->data = pNode->data; pCoder->data = pNode->data;
pCoder->pos = pCoder->size + pNode->pos; pCoder->pos = pCoder->size + pNode->pos;
pCoder->size = pNode->size; pCoder->size = pNode->size;
taosMemoryFree(pNode);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册