提交 142d430e 编写于 作者: H Hongze Cheng

more code

上级 7c10f0bb
......@@ -3226,6 +3226,20 @@ int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData);
void tDestroySSubmitReq2(SSubmitReq2* pReq);
typedef struct {
int32_t code;
int32_t affectedRows;
SArray* aCreateTbRsp; // SArray<SVCreateTbRsp>
} SSubmitRsp2;
int32_t tCreateSSubmitRsp2(SSubmitRsp2** ppRsp);
void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
int32_t tEncodeSSubmitRsp2(SEncoder* pCoder, const SSubmitRsp2* pRsp);
int32_t tDecodeSSubmitRsp2(SDecoder* pCoder, SSubmitRsp2* pRsp);
#define TSDB_MSG_FLG_ENCODE 0x1
#define TSDB_MSG_FLG_DECODE 0x2
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -6857,3 +6857,104 @@ void tDestroySSubmitReq2(SSubmitReq2 *pReq) {
taosArrayDestroyEx(pReq->aSubmitTbData, (FDelete)destroySSubmitTbData);
taosMemoryFree(pReq);
}
int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pRsp->code) < 0) return -1;
if (tEncodeI32v(pCoder, pRsp->affectedRows) < 0) return -1;
if (tEncodeI32v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pRsp->aCreateTbRsp); ++i) {
if (tEncodeSVCreateTbRsp(pCoder, taosArrayGet(pRsp->aCreateTbRsp, i)) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSSubmitRsp2(SDecoder *pCoder, SSubmitRsp2 *pRsp) {
int32_t code = 0;
memset(pRsp, 0, sizeof(SSubmitRsp2));
// decode
if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI32v(pCoder, &pRsp->code) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI32v(pCoder, &pRsp->affectedRows) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
int32_t nCreateTbRsp;
if (tDecodeI32v(pCoder, &nCreateTbRsp) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
pRsp->aCreateTbRsp = taosArrayInit(nCreateTbRsp, sizeof(SVCreateTbRsp));
if (pRsp->aCreateTbRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < nCreateTbRsp; ++i) {
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pRsp->aCreateTbRsp, 1);
if (tDecodeSVCreateTbRsp(pCoder, pCreateTbRsp) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder);
_exit:
if (code) {
if (pRsp->aCreateTbRsp) {
taosArrayDestroyEx(pRsp->aCreateTbRsp, NULL /* todo */);
}
}
return code;
}
int32_t tCreateSSubmitRsp2(SSubmitRsp2 **ppRsp) {
int32_t code = 0;
SSubmitRsp2 *pRsp = (SSubmitRsp2 *)taosMemoryCalloc(1, sizeof(*pRsp));
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pRsp->aCreateTbRsp = taosArrayInit(16, sizeof(SVCreateTbRsp));
if (pRsp->aCreateTbRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
if (code) {
*ppRsp = NULL;
if (pRsp) {
if (pRsp->aCreateTbRsp) taosArrayDestroy(pRsp->aCreateTbRsp);
taosMemoryFree(pRsp);
}
} else {
*ppRsp = pRsp;
}
return code;
}
void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) {
if (pRsp) {
taosArrayDestroyEx(pRsp->aCreateTbRsp, NULL /* TODO: set according to flag */);
taosMemoryFree(pRsp);
}
}
......@@ -822,43 +822,39 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SMeta *pMeta = pVnode->pMeta;
SSubmitBlk *pBlock = NULL;
// static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
// ASSERT(pMsg != NULL);
// SSubmitMsgIter msgIter = {0};
// SMeta *pMeta = pVnode->pMeta;
// SSubmitBlk *pBlock = NULL;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
// if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
// while (true) {
// if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
// if (pBlock == NULL) break;
vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
}
// vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
// }
return 0;
}
// return 0;
// }
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
#if 1
int32_t code = 0;
SDecoder dc = {0};
SSubmitRsp submitRsp = {0};
SSubmitRsp2 submitRsp = {0};
SSubmitReq2 *pSubmitReq = NULL;
SArray *newTbUids = NULL;
// decode
SDecoder dc = {0};
tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead));
if (tDecodeSSubmitReq2(&dc, &pSubmitReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
submitRsp.pArray = taosArrayInit(1, sizeof(SSubmitBlkRsp));
if (submitRsp.pArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _exit;
}
tDecoderClear(&dc);
// auto create table
if (pSubmitReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) {
......@@ -874,6 +870,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
}
}
// check
for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
// TODO
}
// insert table data
for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, iSubmitTbData);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册