未验证 提交 a06bf6e5 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #12206 from taosdata/feature/vnode_refact1

refact: vnode
......@@ -338,8 +338,8 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
......@@ -611,8 +611,8 @@ typedef struct {
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SEncoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SDecoder* pDecoder, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct {
......@@ -1527,8 +1527,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam;
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq {
......@@ -1540,8 +1540,8 @@ typedef struct SVCreateStbReq {
SRSmaParam pRSmaParam;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq {
......@@ -1549,8 +1549,8 @@ typedef struct SVDropStbReq {
tb_uid_t suid;
} SVDropStbReq;
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq);
int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
#define TD_CREATE_IF_NOT_EXISTS 0x1
typedef struct SVCreateTbReq {
......@@ -1563,7 +1563,7 @@ typedef struct SVCreateTbReq {
union {
struct {
tb_uid_t suid;
const void* pTag;
const uint8_t* pTag;
} ctb;
struct {
SSchemaWrapper schema;
......@@ -1571,8 +1571,8 @@ typedef struct SVCreateTbReq {
};
} SVCreateTbReq;
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq);
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
typedef struct {
int32_t nReqs;
......@@ -1582,15 +1582,15 @@ typedef struct {
};
} SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SCoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SCoder* pCoder, SVCreateTbBatchReq* pReq);
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp);
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
......@@ -1603,8 +1603,8 @@ typedef struct {
};
} SVCreateTbBatchRsp;
int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp);
int tEncodeSVCreateTbBatchRsp(SEncoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
......@@ -1627,8 +1627,8 @@ typedef struct {
};
} SVDropTbBatchReq;
int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq);
int32_t tEncodeSVDropTbBatchReq(SEncoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SDecoder* pCoder, SVDropTbBatchReq* pReq);
typedef struct {
int32_t nRsps;
......@@ -1638,8 +1638,8 @@ typedef struct {
};
} SVDropTbBatchRsp;
int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp);
int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct {
SMsgHead head;
......@@ -1821,14 +1821,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (uint8_t*)pKv->value, pKv->valueLen) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) {
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
......@@ -1837,13 +1837,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
return 0;
}
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
static FORCE_INLINE int32_t tEncodeSClientHbKey(SEncoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
return 0;
......@@ -2046,10 +2046,10 @@ typedef struct {
int32_t reserved;
} SMqCMCommitOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
......@@ -2089,7 +2089,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
......@@ -2098,7 +2098,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
......@@ -2131,7 +2131,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
......@@ -2141,7 +2141,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
......@@ -2596,8 +2596,8 @@ typedef struct {
int64_t suid;
int64_t uid;
int32_t sver;
uint64_t nData;
const void* pData;
uint32_t nData;
const uint8_t* pData;
SVCreateTbReq cTbReq;
} SVSubmitBlk;
......@@ -2610,8 +2610,8 @@ typedef struct {
};
} SVSubmitReq;
int32_t tEncodeSVSubmitReq(SCoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SCoder* pCoder, SVSubmitReq* pReq);
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
#pragma pack(pop)
......
......@@ -154,8 +154,8 @@ typedef struct {
} SStreamTask;
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
......
此差异已折叠。
......@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.offsets = (SMqOffset*)offsets->container.pData;
}
SCoder encoder;
SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
return -1;
}
tCoderClear(&encoder);
tEncoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder);
tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) {
......
此差异已折叠。
......@@ -594,8 +594,8 @@ typedef struct {
SSchemaWrapper outputSchema;
} SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus
}
......
......@@ -411,7 +411,7 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return (void *)buf;
}
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
......@@ -462,7 +462,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
return pEncoder->pos;
}
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
......
......@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SMqCMCommitOffsetReq commitOffsetReq;
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, msgStr, pMsg->rpcMsg.contLen);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
......
......@@ -83,12 +83,12 @@ END:
}
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder);
tEncoderClear(&encoder);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
}
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
tEncoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
......
......@@ -369,7 +369,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch
}
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SCoder coder = {0};
SEncoder encoder = {0};
int32_t contLen;
SName name = {0};
SVCreateStbReq req = {0};
......@@ -422,11 +422,11 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER);
if (tEncodeSVCreateStbReq(&coder, &req) < 0) {
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
return NULL;
}
tCoderClear(&coder);
tEncoderClear(&encoder);
*pContLen = contLen;
taosMemoryFreeClear(req.pRSmaParam.qmsg1);
......@@ -440,7 +440,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
int32_t contLen = 0;
int32_t ret = 0;
SMsgHead *pHead = NULL;
SCoder coder = {0};
SEncoder encoder = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
......@@ -462,9 +462,9 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER);
tEncodeSVDropStbReq(&coder, &req);
tCoderClear(&coder);
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
tEncodeSVDropStbReq(&encoder, &req);
tEncoderClear(&encoder);
*pContLen = contLen;
return pHead;
......
......@@ -70,14 +70,14 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
......@@ -86,12 +86,12 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
tCoderClear(&encoder);
tEncoderClear(&encoder);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
......@@ -138,8 +138,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
if (buf == NULL) goto STREAM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream) < 0) {
goto STREAM_DECODE_OVER;
}
......
......@@ -99,10 +99,10 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
ASSERT(0);
return;
}
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
tDecodeSStreamTask(&decoder, pTask);
tCoderClear(&decoder);
tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
......
......@@ -108,7 +108,7 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool* allHave);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
......@@ -126,8 +126,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t* pUid, int32_t *pNumOfRows,
int16_t *pNumOfCols);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int32_t *pNumOfRows, int16_t *pNumOfCols);
// need to reposition
......@@ -192,7 +192,7 @@ struct SMetaEntry {
int64_t ctime;
int32_t ttlDays;
tb_uid_t suid;
const void *pTags;
const uint8_t *pTags;
} ctbEntry;
struct {
int64_t ctime;
......@@ -205,7 +205,7 @@ struct SMetaEntry {
struct SMetaReader {
int32_t flags;
SMeta *pMeta;
SCoder coder;
SDecoder coder;
SMetaEntry me;
void *pBuf;
int szBuf;
......
......@@ -39,8 +39,8 @@ typedef struct SMSmaCursor SMSmaCursor;
// metaOpen ==================
// metaEntry ==================
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
// metaTable ==================
......
......@@ -15,7 +15,7 @@
#include "meta.h"
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pME->version) < 0) return -1;
......@@ -43,8 +43,8 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
return 0;
}
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) {
uint64_t len;
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
......
......@@ -22,7 +22,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
}
void metaReaderClear(SMetaReader *pReader) {
tCoderClear(&pReader->coder);
tDecoderClear(&pReader->coder);
tdbFree(pReader->pBuf);
}
......@@ -37,7 +37,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u
}
// decode the entry
tCoderInit(&pReader->coder, TD_LITTLE_ENDIAN, pReader->pBuf, pReader->szBuf, TD_DECODER);
tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
goto _err;
......@@ -147,7 +147,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
SSchemaWrapper *pSW = NULL;
SSchema *pSchema = NULL;
void *pBuf;
SCoder coder = {0};
SDecoder coder = {0};
// fetch
skmDbKey.uid = uid;
......@@ -163,11 +163,11 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
pBuf = pVal;
pSW = taosMemoryMalloc(sizeof(SSchemaWrapper));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER);
tDecoderInit(&coder, pVal, vLen);
tDecodeSSchemaWrapper(&coder, pSW);
pSchema = taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
memcpy(pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
tCoderClear(&coder);
tDecoderClear(&coder);
pSW->pSchema = pSchema;
......
此差异已折叠。
......@@ -33,7 +33,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
void *pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
SCoder coder = {0};
SMetaReader mr = {0};
// validate req
......@@ -192,7 +191,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
tb_uid_t uid;
int64_t tver;
SMetaEntry me = {0};
SCoder coder = {0};
SDecoder coder = {0};
int8_t type;
int64_t ctime;
tb_uid_t suid;
......@@ -253,7 +252,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
// decode entry
void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo)
memcpy(pDataCopy, pData, nData);
tCoderInit(&coder, TD_LITTLE_ENDIAN, pDataCopy, nData, TD_DECODER);
tDecoderInit(&coder, pDataCopy, nData);
ret = metaDecodeEntry(&coder, &me);
if (ret < 0) {
ASSERT(0);
......@@ -272,7 +271,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
}
taosMemoryFree(pDataCopy);
tCoderClear(&coder);
tDecoderClear(&coder);
tdbDbcClose(pTbDbc);
if (type == TSDB_CHILD_TABLE) {
......@@ -309,7 +308,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SCoder coder = {0};
SEncoder coder = {0};
// set key and value
tbDbKey.version = pME->version;
......@@ -330,13 +329,13 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
goto _err;
}
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER);
tEncoderInit(&coder, pVal, vLen);
if (metaEncodeEntry(&coder, pME) < 0) {
goto _err;
}
tCoderClear(&coder);
tEncoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
......@@ -393,7 +392,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
}
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SCoder coder = {0};
SEncoder coder = {0};
void *pVal = NULL;
int vLen = 0;
int rcode = 0;
......@@ -422,7 +421,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
goto _exit;
}
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER);
tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
......@@ -432,7 +431,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
_exit:
taosMemoryFree(pVal);
tCoderClear(&coder);
tEncoderClear(&coder);
return rcode;
}
......
......@@ -910,12 +910,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask == NULL) {
return -1;
}
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
}
tCoderClear(&decoder);
tDecoderClear(&decoder);
// exec
if (tqExpandTask(pTq, pTask, 4) < 0) {
......
......@@ -63,6 +63,8 @@ struct SMemSkipListCurosr {
SMemSkipListNode *pNodeC;
};
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
......@@ -71,6 +73,8 @@ struct SMemSkipListCurosr {
#define SL_HEAD_NODE(sl) ((sl)->pHead)
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
......@@ -115,19 +119,14 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
STsdb *pTsdb = pMemTb->pTsdb;
SVnode *pVnode = pTsdb->pVnode;
SVBufPool *pPool = pVnode->inUse;
int32_t hash;
int32_t tlen;
uint8_t buf[16];
int32_t rlen;
const uint8_t *p;
SMemSkipListNode *pSlNode;
const STSRow *pTSRow;
SMemSkipListCurosr slc = {0};
// search hash
hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket;
for (pMemData = pMemTb->pBuckets[hash]; pMemData; pMemData = pMemData->pHashNext) {
if (pMemData->suid == pSubmitBlk->suid && pMemData->uid == pSubmitBlk->uid) break;
tb_uid_t suid = pSubmitBlk->suid;
tb_uid_t uid = pSubmitBlk->uid;
int32_t iBucket;
// search SMemData by hash
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
if (pMemData->suid == suid && pMemData->uid == uid) break;
}
// create pMemData if need
......@@ -143,8 +142,8 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
}
pMemData->pHashNext = NULL;
pMemData->suid = pSubmitBlk->suid;
pMemData->uid = pSubmitBlk->uid;
pMemData->suid = suid;
pMemData->uid = uid;
pMemData->minKey = TSKEY_MAX;
pMemData->maxKey = TSKEY_MIN;
pMemData->minVer = -1;
......@@ -159,55 +158,67 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
pHead->level = maxLevel;
pTail->level = maxLevel;
for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
SL_NODE_FORWARD(pHead, iLevel) = pTail;
SL_NODE_FORWARD(pTail, iLevel) = pHead;
SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
}
// add to MemTable
hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket;
pMemData->pHashNext = pMemTb->pBuckets[hash];
pMemTb->pBuckets[hash] = pMemData;
// add to hash
if (pMemTb->nHash >= pMemTb->nBucket) {
// rehash (todo)
}
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
pMemData->pHashNext = pMemTb->pBuckets[iBucket];
pMemTb->pBuckets[iBucket] = pMemData;
pMemTb->nHash++;
// sort organize (todo)
}
// loop to insert data to skiplist
#if 0
tsdbMemSkipListCursorOpen(&slc, &pMemData->sl);
p = pSubmitBlk->pData;
for (;;) {
if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break;
// do insert data to SMemData
SMemSkipListCurosr slc = {0};
const uint8_t *p = pSubmitBlk->pData;
const uint8_t *pt;
const STSRow *pRow;
uint64_t szRow;
SDecoder decoder = {0};
const uint8_t *pt = p;
p = tGetBinary(p, &pTSRow, &rlen);
// tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
for (;;) {
// if (tDecodeIsEnd(&coder)) break;
// if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
// check the row (todo)
// move the cursor to position to write (todo)
int32_t c;
tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
ASSERT(c);
// // move the cursor to position to write (todo)
// int32_t c;
// tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
// ASSERT(c);
// encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
pSlNode = vnodeBufPoolMalloc(pPool, tsize);
pSlNode->level = level;
// // encode row
// int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
// int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
// pSlNode = vnodeBufPoolMalloc(pPool, tsize);
// pSlNode->level = level;
uint8_t *pData = SL_NODE_DATA(pSlNode);
*(int64_t *)pData = version;
pData += sizeof(version);
memcpy(pData, pt, p - pt);
// uint8_t *pData = SL_NODE_DATA(pSlNode);
// *(int64_t *)pData = version;
// pData += sizeof(version);
// memcpy(pData, pt, p - pt);
// insert row
tsdbMemSkipListCursorPut(&slc, pSlNode);
// // insert row
// tsdbMemSkipListCursorPut(&slc, pSlNode);
// update status
if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts;
if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts;
if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
}
tsdbMemSkipListCursorClose(&slc);
#endif
// tCoderClear(&coder);
// tsdbMemSkipListCursorClose(&slc);
// update status
if (pMemData->minVer == -1) pMemData->minVer = version;
if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
......@@ -218,7 +229,3 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
return 0;
}
\ No newline at end of file
// SMemData
// SMemSkipList
\ No newline at end of file
......@@ -17,6 +17,6 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
#if(${BUILD_TEST})
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST})
endif(${BUILD_TEST})
此差异已折叠。
......@@ -156,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
}
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
SCoder coder = {0};
SEncoder coder = {0};
char* pBuf;
int32_t len;
......@@ -176,9 +176,9 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
pBuf= pBlocks->pData + pBlocks->size;
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, len, TD_ENCODER);
tEncoderInit(&coder, pBuf, len);
tEncodeSVCreateTbReq(&coder, pCreateTbReq);
tCoderClear(&coder);
tEncoderClear(&coder);
pBlocks->size += len;
pBlocks->createTbReqLen = len;
......
......@@ -3558,7 +3558,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch, SArray* pBufArray) {
int tlen;
SCoder coder = {0};
SEncoder coder = {0};
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret);
......@@ -3571,9 +3571,9 @@ static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER);
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
tEncodeSVCreateTbBatchReq(&coder, &pTbBatch->req);
tCoderClear(&coder);
tEncoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
......@@ -3958,7 +3958,7 @@ static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDe
static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) {
int tlen;
SCoder coder = {0};
SEncoder coder = {0};
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &pTbBatch->req, tlen, ret);
......@@ -3971,9 +3971,9 @@ static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SA
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER);
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
tEncodeSVDropTbBatchReq(&coder, &pTbBatch->req);
tCoderClear(&coder);
tEncoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
......
......@@ -1085,21 +1085,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_CREATE_TABLE_RSP: {
SVCreateTbBatchRsp batchRsp = {0};
if (msg) {
SCoder coder = {0};
tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER);
SDecoder coder = {0};
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tCoderClear(&coder);
tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
}
}
}
tCoderClear(&coder);
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
......@@ -1110,21 +1110,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_DROP_TABLE_RSP: {
SVDropTbBatchRsp batchRsp = {0};
if (msg) {
SCoder coder = {0};
tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER);
SDecoder coder = {0};
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVDropTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tCoderClear(&coder);
tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
}
}
}
tCoderClear(&coder);
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
......
......@@ -256,7 +256,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
return pTask;
}
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
......@@ -301,7 +301,7 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
return pEncoder->pos;
}
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
......
此差异已折叠。
此差异已折叠。
......@@ -41,9 +41,9 @@ target_sources(freelistTest
)
target_link_libraries(freelistTest os util gtest gtest_main)
# encodeTest
add_executable(encodeTest "encodeTest.cpp")
target_link_libraries(encodeTest os util gtest gtest_main)
# # encodeTest
# add_executable(encodeTest "encodeTest.cpp")
# target_link_libraries(encodeTest os util gtest gtest_main)
# queueTest
add_executable(procTest "procTest.cpp")
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册