提交 8ca5e026 编写于 作者: H Hongze Cheng

adjust more code

上级 3e2f7980
...@@ -3202,29 +3202,30 @@ int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); ...@@ -3202,29 +3202,30 @@ int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
typedef struct { typedef struct {
bool isColFmt; int32_t flags;
int64_t suid; SVCreateTbReq* pCreateTbReq;
int64_t uid; int64_t suid;
int32_t sver; int64_t uid;
int32_t sver;
union { union {
SArray* aRowP; SArray* aRowP;
SArray* aCol; SArray* aCol;
}; };
} SSubmitTbData; } SSubmitTbData;
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
typedef struct { typedef struct {
int32_t flag; SArray* aSubmitTbData; // SArray<SSubmitTbData>
SArray* aCreateTbReq;
SArray* aSubmitTbData;
} SSubmitReq2; } SSubmitReq2;
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData);
void tDestroySSubmitReq2(SSubmitReq2* pReq); void tDestroySSubmitTbData(SSubmitTbData* pTbData);
void tDestroySSubmitReq2(SSubmitReq2* pReq);
typedef struct { typedef struct {
int32_t code; int32_t code;
......
...@@ -6651,20 +6651,28 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { ...@@ -6651,20 +6651,28 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
return 0; return 0;
} }
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData, int8_t colFmt) { static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pSubmitTbData->flags) < 0) return -1;
// auto create table
if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
ASSERT(pSubmitTbData->pCreateTbReq);
if (tEncodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq) < 0) return -1;
}
// submit data
if (tEncodeI64(pCoder, pSubmitTbData->suid) < 0) return -1; if (tEncodeI64(pCoder, pSubmitTbData->suid) < 0) return -1;
if (tEncodeI64(pCoder, pSubmitTbData->uid) < 0) return -1; if (tEncodeI64(pCoder, pSubmitTbData->uid) < 0) return -1;
if (tEncodeI32v(pCoder, pSubmitTbData->sver) < 0) return -1; if (tEncodeI32v(pCoder, pSubmitTbData->sver) < 0) return -1;
if (colFmt) { if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
// todo ASSERT(0); // TODO
ASSERT(0);
} else { } else {
if (tEncodeI64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1; if (tEncodeU64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pSubmitTbData->aRowP); i++) { for (int32_t iRow = 0; iRow < taosArrayGetSize(pSubmitTbData->aRowP); ++iRow) {
SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, i); SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, iRow);
if (pCoder->data) memcpy(pCoder->data + pCoder->pos, pRow, pRow->len); if (pCoder->data) memcpy(pCoder->data + pCoder->pos, pRow, pRow->len);
pCoder->pos += pRow->len; pCoder->pos += pRow->len;
} }
...@@ -6682,6 +6690,22 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa ...@@ -6682,6 +6690,22 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
goto _exit; goto _exit;
} }
if (tDecodeI32v(pCoder, &pSubmitTbData->flags) < 0) return -1;
if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
pSubmitTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pSubmitTbData->pCreateTbReq == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if (tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
// submit data
if (tDecodeI64(pCoder, &pSubmitTbData->suid) < 0) { if (tDecodeI64(pCoder, &pSubmitTbData->suid) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
...@@ -6695,24 +6719,24 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa ...@@ -6695,24 +6719,24 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
goto _exit; goto _exit;
} }
if (colFmt) { if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
// todo ASSERT(0); // TODO
ASSERT(0);
} else { } else {
int64_t nRows = 0; uint64_t nRow;
if (tDecodeI64v(pCoder, &nRows) < 0) { if (tDecodeU64v(pCoder, &nRow) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
} }
pSubmitTbData->aRowP = taosArrayInit(nRows, sizeof(SRow *)); pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *));
if (pSubmitTbData->aRowP == NULL) { if (pSubmitTbData->aRowP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
for (int32_t i = 0; i < nRows; i++) { for (int32_t iRow = 0; iRow < nRow; ++iRow) {
SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1);
*ppRow = (SRow *)(pCoder->data + pCoder->pos); *ppRow = (SRow *)(pCoder->data + pCoder->pos);
pCoder->pos += (*ppRow)->len; pCoder->pos += (*ppRow)->len;
} }
...@@ -6722,7 +6746,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa ...@@ -6722,7 +6746,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
_exit: _exit:
if (code) { if (code) {
// todo: do clear // TODO: clear
} }
return 0; return 0;
} }
...@@ -6730,35 +6754,19 @@ _exit: ...@@ -6730,35 +6754,19 @@ _exit:
int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) { int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pReq->flag) < 0) return -1; if (tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1;
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)) < 0) return -1;
if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aCreateTbReq)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->aCreateTbReq); ++i) {
SVCreateTbReq *pCreateTbReq = (SVCreateTbReq *)taosArrayGet(pReq->aCreateTbReq, i);
if (tEncodeSVCreateTbReq(pCoder, pCreateTbReq) < 0) return -1;
}
}
if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = (SSubmitTbData *)taosArrayGet(pReq->aSubmitTbData, i);
if (tEncodeSSubmitTbData(pCoder, pSubmitTbData, pReq->flag & SUBMIT_REQ_COLUMN_DATA_FORMAT) < 0) return -1;
} }
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
} }
int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 **ppReq) { int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 *pReq) {
int32_t code = 0; int32_t code = 0;
// alloc memset(pReq, 0, sizeof(*pReq));
SSubmitReq2 *pReq = (SSubmitReq2 *)taosMemoryCalloc(1, sizeof(SSubmitReq2));
if (pReq == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// decode // decode
if (tStartDecode(pCoder) < 0) { if (tStartDecode(pCoder) < 0) {
......
...@@ -822,29 +822,12 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, ...@@ -822,29 +822,12 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
// ASSERT(pMsg != NULL);
// SSubmitMsgIter msgIter = {0};
// SMeta *pMeta = pVnode->pMeta;
// SSubmitBlk *pBlock = NULL;
// if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
// while (true) {
// if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
// if (pBlock == NULL) break;
// vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
// }
// return 0;
// }
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
#if 1 #if 1
int32_t code = 0; int32_t code = 0;
SSubmitRsp2 submitRsp = {0};
SSubmitReq2 *pSubmitReq = NULL; SSubmitReq2 *pSubmitReq = NULL;
SSubmitRsp2 *pSubmitRsp = NULL;
SArray *newTbUids = NULL; SArray *newTbUids = NULL;
// decode // decode
...@@ -856,25 +839,39 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -856,25 +839,39 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
} }
tDecoderClear(&dc); tDecoderClear(&dc);
// init
code = tCreateSSubmitRsp2(&pSubmitRsp);
if (code) goto _exit;
// auto create table // auto create table
if (pSubmitReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { for (int32_t iCreateTbReq = 0; iCreateTbReq < taosArrayGetSize(pSubmitReq->aCreateTbReq); iCreateTbReq++) {
for (int32_t iCreateTbReq = 0; iCreateTbReq < taosArrayGetSize(pSubmitReq->aCreateTbReq); iCreateTbReq++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(pSubmitReq->aCreateTbReq, iCreateTbReq);
SVCreateTbReq *pCreateTbReq = taosArrayGet(pSubmitReq->aCreateTbReq, iCreateTbReq);
if (metaCreateTable(pVnode->pMeta, version, pCreateTbReq, NULL /* todo */) < 0) { SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (pCreateTbRsp == NULL) {
// todo code = TSDB_CODE_TDB_OUT_OF_MEMORY;
} goto _exit;
} else { }
if (metaCreateTable(pVnode->pMeta, version, pCreateTbReq, &pCreateTbRsp->pMeta) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
// todo
} }
} else {
// todo
} }
} }
// check // // check
for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { // for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); // SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
// TODO
} // SMetaInfo info = {0};
// code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
// if (code) {
// // TODO
// }
// }
// insert table data // insert table data
for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) { for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) {
...@@ -882,7 +879,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -882,7 +879,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SSubmitBlkRsp submitBlkRsp = {0}; SSubmitBlkRsp submitBlkRsp = {0};
tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &submitBlkRsp); code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &submitBlkRsp);
if (code) goto _exit;
pSubmitRsp->affectedRows += taosArrayGetSize(pSubmitTbData->aRowP);
} }
_exit: _exit:
...@@ -910,10 +910,6 @@ _exit: ...@@ -910,10 +910,6 @@ _exit:
pSubmitReq->version = version; pSubmitReq->version = version;
statis.nBatchInsert = 1; statis.nBatchInsert = 1;
#ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) { if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _exit; goto _exit;
......
...@@ -191,6 +191,8 @@ void* taosArrayReserve(SArray* pArray, int32_t num) { ...@@ -191,6 +191,8 @@ void* taosArrayReserve(SArray* pArray, int32_t num) {
void* dst = TARRAY_GET_ELEM(pArray, pArray->size); void* dst = TARRAY_GET_ELEM(pArray, pArray->size);
pArray->size += num; pArray->size += num;
memset(dst, 0, num * pArray->elemSize);
return dst; return dst;
} }
...@@ -333,9 +335,9 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { ...@@ -333,9 +335,9 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
} else { } else {
ASSERT(pSrc->elemSize == sizeof(void*)); ASSERT(pSrc->elemSize == sizeof(void*));
for(int32_t i = 0; i < pSrc->size; ++i) { for (int32_t i = 0; i < pSrc->size; ++i) {
void* p = fn(taosArrayGetP(pSrc, i)); void* p = fn(taosArrayGetP(pSrc, i));
memcpy(((char*)dst->pData )+ i * dst->elemSize, &p, dst->elemSize); memcpy(((char*)dst->pData) + i * dst->elemSize, &p, dst->elemSize);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册