diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9ec80293a838f9ee825dc77e97f31b12d621caf6..e6aaff211b26b2effa2368458e42cc9643db0107 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -252,21 +252,29 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); typedef struct { - int32_t index; // index of failed block in submit blocks - int32_t vnode; // vnode index of failed block - int32_t sid; // table index of failed block - int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table -} SSubmitRspBlock; - -typedef struct { - int32_t code; // 0-success, > 0 error code - int32_t numOfRows; // number of records the client is trying to write - int32_t affectedRows; // number of records actually written - int32_t failedRows; // number of failed records (exclude duplicate records) - int32_t numOfFailedBlocks; - SSubmitRspBlock failedBlocks[]; + int8_t hashMeta; + int64_t uid; + union { + char* name; + const char* namec; + }; + int32_t numOfRows; + int32_t affectedRows; +} SSubmitBlkRsp; + +typedef struct { + int32_t numOfRows; + int32_t affectedRows; + int32_t nBlocks; + union { + SArray* pArray; + SSubmitBlkRsp* pBlocks; + }; } SSubmitRsp; +int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); +int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); + #define SCHEMA_SMA_ON 0x1 #define SCHEMA_IDX_ON 0x2 typedef struct SSchema { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 85b55611dcd41f8bc3efd2f536bc77a1606e2b6c..679ec5892dac34f8ce7fbe50df501245d449db8c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4013,3 +4013,65 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { tEndDecode(pCoder); return 0; } + +static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) { + if (tStartEncode(pEncoder) < 0) return -1; + + if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; + if (pBlock->hashMeta) { + if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; + if (tEncodeCStr(pEncoder, pBlock->name) < 0) return -1; + } + if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; + if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; + + tEndEncode(pEncoder); + return 0; +} + +static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { + if (tStartDecode(pDecoder) < 0) return -1; + + if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; + if (pBlock->hashMeta) { + if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; + if (tDecodeCStr(pDecoder, &pBlock->namec) < 0) return -1; + } + if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; + if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) { + int32_t nBlocks = taosArrayGetSize(pRsp->pArray); + + if (tStartEncode(pEncoder) < 0) return -1; + + if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1; + if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1; + if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1; + for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) { + if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1; + } + + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + + if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1; + if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1; + if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1; + pRsp->pBlocks = tDecoderMalloc(pDecoder, sizeof(*pRsp->pBlocks) * pRsp->nBlocks); + if (pRsp->pBlocks == NULL) return -1; + for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) { + if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1; + } + + tEndDecode(pDecoder); + return 0; +} diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 986b2740f3a3d97787e6a6f8aa7c44ea590540a1..9a36fc6eaedc23098028658fb745c961c87b2e28 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -102,7 +102,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t* pAffectedRows); +int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d514512881126c3677d837687a7c06b40e5aadff..95156c1e1cc8025d8be044bd423c2bf13ba47550 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -288,7 +288,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) { +int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { SSubmitBlkIter blkIter = {0}; STsdbMemTable *pMemTable = pTsdb->mem; void *tptr; @@ -342,7 +342,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; - (*pAffectedRows) = pMsgIter->numOfRows; + pRsp->numOfRows = pRsp->numOfRows; + pRsp->affectedRows = pRsp->affectedRows; return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 5a7892a7502e1cef4f665c702e985f61eba9e79a..3107c6f5c73d0d349c62568f46874af10c08bff4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -36,9 +36,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * // loop to insert tInitSubmitMsgIter(pMsg, &msgIter); while (true) { + SSubmitBlkRsp r = {0}; tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) { + if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &r) < 0) { return -1; } @@ -46,8 +47,8 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * } if (pRsp != NULL) { - pRsp->affectedRows = affectedrows; - pRsp->numOfRows = numOfRows; + // pRsp->affectedRows = affectedrows; + // pRsp->numOfRows = numOfRows; } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0082ca0802ec2ff1874a0b1df00088581ddcf157..7692b7084dfd33228fc42d87bf6c163ddfb591fe 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -497,7 +497,7 @@ _exit: return 0; } -static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char* tags) { +static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { ASSERT(pMsg != NULL); SSubmitMsgIter msgIter = {0}; SMeta *pMeta = pVnode->pMeta; @@ -518,11 +518,11 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char taosMemoryFreeClear(pSchema); } pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema - if(pSchema) { + if (pSchema) { suid = msgIter.suid; } } - if(!pSchema) { + if (!pSchema) { printf("%s:%d no valid schema\n", tags, __LINE__); continue; } @@ -540,12 +540,15 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; + SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock; SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; int32_t nRows; + int32_t tsize, ret; + SEncoder encoder = {0}; pRsp->code = 0; @@ -559,12 +562,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in goto _exit; } - for (;;) { + submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp)); + for (int i = 0;;) { tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; + SSubmitBlkRsp submitBlkRsp = {0}; + // create table for auto create table mode if (msgIter.schemaLen > 0) { + submitBlkRsp.hashMeta = 1; + tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { pRsp->code = TSDB_CODE_INVALID_MSG; @@ -580,6 +588,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in } } + submitBlkRsp.uid = createTbReq.uid; + submitBlkRsp.name = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); + sprintf(submitBlkRsp.name, "%s.%s", pVnode->config.dbname, createTbReq.name); + msgIter.uid = createTbReq.uid; if (createTbReq.type == TSDB_CHILD_TABLE) { msgIter.suid = createTbReq.ctb.suid; @@ -590,20 +602,29 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in tDecoderClear(&decoder); } - if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) { + if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { pRsp->code = terrno; goto _exit; } - rsp.affectedRows += nRows; - + submitRsp.numOfRows += submitBlkRsp.numOfRows; + submitRsp.affectedRows += submitBlkRsp.affectedRows; + taosArrayPush(submitRsp.pArray, &submitBlkRsp); } _exit: - // encode the response (TODO) - pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); - memcpy(pRsp->pCont, &rsp, sizeof(rsp)); - pRsp->contLen = sizeof(SSubmitRsp); + tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); + pRsp->pCont = rpcMallocCont(tsize); + pRsp->contLen = tsize; + tEncoderInit(&encoder, pRsp->pCont, tsize); + tEncodeSSubmitRsp(&encoder, &submitRsp); + tEncoderClear(&encoder); + + for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) { + taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].name); + } + + taosArrayDestroy(submitRsp.pArray); tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 10e42550222c18b99df5220c288eb2fea2b0ae11..a8b0f2e2b82ac88ed480fe5b01ed9e171a70428a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1135,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_SUBMIT_RSP: { if (msg) { SSubmitRsp *rsp = (SSubmitRsp *)msg; - SCH_ERR_JRET(rsp->code); + // SCH_ERR_JRET(rsp->code); } SCH_ERR_JRET(rspCode);