提交 21c8d981 编写于 作者: H Hongze Cheng

more code

上级 49c78ed5
......@@ -3226,6 +3226,28 @@ int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
typedef struct {
int64_t suid;
int64_t uid;
int32_t sver;
union {
SArray* aRowP;
SArray* aCol;
};
} SSubmitTbData;
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
typedef struct {
int32_t flag;
SArray* aCreateTbReq;
SArray* aSubmitTbData;
} SSubmitReq2;
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq);
void tDestroySSubmitReq2(SSubmitReq2* pReq);
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -116,6 +116,7 @@ static int32_t tEncodeI64v(SEncoder* pCoder, int64_t val);
static int32_t tEncodeFloat(SEncoder* pCoder, float val);
static int32_t tEncodeDouble(SEncoder* pCoder, double val);
static int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, uint32_t len);
static int32_t tEncodeBinaryEx(SEncoder* pCoder, const uint8_t* val, uint32_t len);
static int32_t tEncodeCStrWithLen(SEncoder* pCoder, const char* val, uint32_t len);
static int32_t tEncodeCStr(SEncoder* pCoder, const char* val);
......
......@@ -6652,3 +6652,194 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
}
return 0;
}
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData, int8_t colFmt) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pSubmitTbData->suid) < 0) return -1;
if (tEncodeI64(pCoder, pSubmitTbData->uid) < 0) return -1;
if (tEncodeI32v(pCoder, pSubmitTbData->sver) < 0) return -1;
if (colFmt) {
// todo
ASSERT(0);
} else {
if (tEncodeI64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pSubmitTbData->aRowP); i++) {
SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, i);
if (tEncodeBinary(pCoder, (uint8_t *)pRow, pRow->len) < 0) return -1; // todo
}
}
tEndEncode(pCoder);
return 0;
}
static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, int8_t colFmt) {
int32_t code = 0;
if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI64(pCoder, &pSubmitTbData->suid) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI64(pCoder, &pSubmitTbData->uid) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI32v(pCoder, &pSubmitTbData->sver) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (colFmt) {
// todo
ASSERT(0);
} else {
int64_t nRows = 0;
if (tDecodeI64v(pCoder, &nRows) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
pSubmitTbData->aRowP = taosArrayInit(nRows, sizeof(SRow *));
if (pSubmitTbData->aRowP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < nRows; i++) {
SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1);
if (tDecodeBinary(pCoder, (uint8_t **)ppRow, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
}
tEndDecode(pCoder);
_exit:
if (code) {
// todo: do clear
}
return 0;
}
int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pReq->flag) < 0) return -1;
if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aCreateTbReq)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->aCreateTbReq); ++i) {
SVCreateTbReq *pCreateTbReq = (SVCreateTbReq *)taosArrayGet(pReq->aCreateTbReq, i);
if (tEncodeSVCreateTbReq(pCoder, pCreateTbReq) < 0) return -1;
}
}
if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = (SSubmitTbData *)taosArrayGet(pReq->aSubmitTbData, i);
if (tEncodeSSubmitTbData(pCoder, pSubmitTbData, pReq->flag & SUBMIT_REQ_COLUMN_DATA_FORMAT) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 **ppReq) {
int32_t code = 0;
// alloc
SSubmitReq2 *pReq = (SSubmitReq2 *)taosMemoryCalloc(1, sizeof(SSubmitReq2));
if (pReq == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// decode
if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI32v(pCoder, &pReq->flag) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) {
int64_t nCreateTbReq = 0;
if (tDecodeI64v(pCoder, &nCreateTbReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
pReq->aCreateTbReq = taosArrayInit(nCreateTbReq, sizeof(SVCreateTbReq));
if (pReq->aCreateTbReq == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int64_t i = 0; i < nCreateTbReq; ++i) {
SVCreateTbReq *pCreateTbReq = taosArrayReserve(pReq->aCreateTbReq, 1);
if (tDecodeSVCreateTbReq(pCoder, pCreateTbReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
}
int64_t nSubmitTbData = 0;
if (tDecodeI64v(pCoder, &nSubmitTbData) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
pReq->aSubmitTbData = taosArrayInit(nSubmitTbData, sizeof(SSubmitTbData));
if (pReq->aSubmitTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int64_t i = 0; i < nSubmitTbData; ++i) {
SSubmitTbData *pSubmitTbData = taosArrayReserve(pReq->aSubmitTbData, 1);
if (tDecodeSSubmitTbData(pCoder, pSubmitTbData, pReq->flag & SUBMIT_REQ_COLUMN_DATA_FORMAT) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder);
_exit:
if (code) {
*ppReq = NULL;
if (pReq) {
// todo: do other clear
taosMemoryFree(pReq);
}
} else {
*ppReq = pReq;
}
return 0;
}
void tDestroySSubmitReq2(SSubmitReq2 *pReq) {
if (NULL == pReq) return;
if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) {
taosArrayDestroyEx(pReq->aCreateTbReq, NULL /* todo */);
}
taosArrayDestroyEx(pReq->aSubmitTbData, NULL /* todo */);
taosMemoryFree(pReq);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册