diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ab90bd200a258f516e69e040b2db3c8d94378045..47bd0d0b029e48ea3e6b64107cc0b90291be63a5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -268,6 +268,35 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); // for debug int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema); +struct SSchema { + int8_t type; + int8_t flags; + col_id_t colId; + int32_t bytes; + char name[TSDB_COL_NAME_LEN]; +}; + + +typedef struct { + char tbName[TSDB_TABLE_NAME_LEN]; + char stbName[TSDB_TABLE_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + int64_t dbId; + int32_t numOfTags; + int32_t numOfColumns; + int8_t precision; + int8_t tableType; + int32_t sversion; + int32_t tversion; + uint64_t suid; + uint64_t tuid; + int32_t vgId; + int8_t sysInfo; + SSchema* pSchemas; +} STableMetaRsp; + + + typedef struct { int32_t code; int8_t hashMeta; @@ -276,6 +305,7 @@ typedef struct { int32_t numOfRows; int32_t affectedRows; int64_t sver; + STableMetaRsp* pMeta; } SSubmitBlkRsp; typedef struct { @@ -290,6 +320,7 @@ typedef struct { int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); +void tFreeSSubmitBlkRsp(void* param); void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SMA_ON ((int8_t)0x1) @@ -297,13 +328,6 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SET_NULL ((int8_t)0x10) #define COL_SET_VAL ((int8_t)0x20) #define COL_IS_SYSINFO ((int8_t)0x40) -struct SSchema { - int8_t type; - int8_t flags; - col_id_t colId; - int32_t bytes; - char name[TSDB_COL_NAME_LEN]; -}; #define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0) #define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL))) @@ -442,26 +466,6 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver); - -typedef struct { - char tbName[TSDB_TABLE_NAME_LEN]; - char stbName[TSDB_TABLE_NAME_LEN]; - char dbFName[TSDB_DB_FNAME_LEN]; - int64_t dbId; - int32_t numOfTags; - int32_t numOfColumns; - int8_t precision; - int8_t tableType; - int32_t sversion; - int32_t tversion; - uint64_t suid; - uint64_t tuid; - int32_t vgId; - int8_t sysInfo; - SSchema* pSchemas; -} STableMetaRsp; - - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 640cd8ed4d6affc1db3f7c14bb2a5d73ff750cff..0480c49ca189bc9e00e665657ba4302f2fda8690 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -723,6 +723,12 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog for (int32_t i = 0; i < pRsp->nBlocks; ++i) { SSubmitBlkRsp* blk = pRsp->pBlocks + i; + if (blk->pMeta) { + handleCreateTbExecRes(blk->pMeta, pCatalog); + tFreeSTableMetaRsp(blk->pMeta); + taosMemoryFreeClear(blk->pMeta); + } + if (NULL == blk->tblFName || 0 == blk->tblFName[0]) { continue; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e86eb6ff62fff6dfbdf5a416cfb3652f47e9b94f..058f26d1454a20af7fe5ee480e9b9240ade999af 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5324,6 +5324,10 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1; + if (tEncodeI32(pEncoder, pBlock->pMeta ? 1 : 0) < 0) return -1; + if (pBlock->pMeta) { + if (tEncodeSTableMetaRsp(pEncoder, pBlock->pMeta) < 0) return -1; + } tEndEncode(pEncoder); return 0; @@ -5341,6 +5345,16 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1; + + int32_t meta = 0; + if (tDecodeI32(pDecoder, &meta) < 0) return -1; + if (meta) { + pBlock->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); + if (NULL == pBlock->pMeta) return -1; + if (tDecodeSTableMetaRsp(pDecoder, pBlock->pMeta) < 0) return -1; + } else { + pBlock->pMeta = NULL; + } tEndDecode(pDecoder); return 0; @@ -5379,6 +5393,21 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { return 0; } +void tFreeSSubmitBlkRsp(void* param) { + if (NULL == param) { + return; + } + + SSubmitBlkRsp* pRsp = (SSubmitBlkRsp*)param; + + taosMemoryFree(pRsp->tblFName); + if (pRsp->pMeta) { + taosMemoryFree(pRsp->pMeta->pSchemas); + taosMemoryFree(pRsp->pMeta); + } +} + + void tFreeSSubmitRsp(SSubmitRsp *pRsp) { if (NULL == pRsp) return; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 51a1347383e1267fd7626445f1cf6c3828a42a5a..85feecff1a0247790eec14b503b7dda6247a6c87 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -867,7 +867,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq goto _exit; } - if (metaCreateTable(pVnode->pMeta, version, &createTbReq, NULL) < 0) { + if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { submitBlkRsp.code = terrno; pRsp->code = terrno; @@ -875,6 +875,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq taosArrayDestroy(createTbReq.ctb.tagName); goto _exit; } + } else { + if (NULL != submitBlkRsp.pMeta) { + vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); + } } taosArrayPush(newTbUids, &createTbReq.uid); @@ -918,11 +922,7 @@ _exit: tEncodeSSubmitRsp(&encoder, &submitRsp); tEncoderClear(&encoder); - for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) { - taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName); - } - - taosArrayDestroy(submitRsp.pArray); + taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); // TODO: the partial success scenario and the error case // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level