提交 a0ae0c78 编写于 作者: D dapan1121

enh: update table meta after auto creating table

上级 c2b16acc
......@@ -268,6 +268,35 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
// for debug
int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema);
struct SSchema {
int8_t type;
int8_t flags;
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
};
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
int8_t sysInfo;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
int32_t code;
int8_t hashMeta;
......@@ -276,6 +305,7 @@ typedef struct {
int32_t numOfRows;
int32_t affectedRows;
int64_t sver;
STableMetaRsp* pMeta;
} SSubmitBlkRsp;
typedef struct {
......@@ -290,6 +320,7 @@ typedef struct {
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
void tFreeSSubmitBlkRsp(void* param);
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
......@@ -297,13 +328,6 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SET_NULL ((int8_t)0x10)
#define COL_SET_VAL ((int8_t)0x20)
#define COL_IS_SYSINFO ((int8_t)0x40)
struct SSchema {
int8_t type;
int8_t flags;
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
};
#define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
......@@ -442,26 +466,6 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver);
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
int8_t sysInfo;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......
......@@ -723,6 +723,12 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp* blk = pRsp->pBlocks + i;
if (blk->pMeta) {
handleCreateTbExecRes(blk->pMeta, pCatalog);
tFreeSTableMetaRsp(blk->pMeta);
taosMemoryFreeClear(blk->pMeta);
}
if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
continue;
}
......
......@@ -5324,6 +5324,10 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1;
if (tEncodeI32(pEncoder, pBlock->pMeta ? 1 : 0) < 0) return -1;
if (pBlock->pMeta) {
if (tEncodeSTableMetaRsp(pEncoder, pBlock->pMeta) < 0) return -1;
}
tEndEncode(pEncoder);
return 0;
......@@ -5341,6 +5345,16 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
int32_t meta = 0;
if (tDecodeI32(pDecoder, &meta) < 0) return -1;
if (meta) {
pBlock->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pBlock->pMeta) return -1;
if (tDecodeSTableMetaRsp(pDecoder, pBlock->pMeta) < 0) return -1;
} else {
pBlock->pMeta = NULL;
}
tEndDecode(pDecoder);
return 0;
......@@ -5379,6 +5393,21 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
return 0;
}
void tFreeSSubmitBlkRsp(void* param) {
if (NULL == param) {
return;
}
SSubmitBlkRsp* pRsp = (SSubmitBlkRsp*)param;
taosMemoryFree(pRsp->tblFName);
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
}
}
void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
if (NULL == pRsp) return;
......
......@@ -867,7 +867,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, NULL) < 0) {
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno;
pRsp->code = terrno;
......@@ -875,6 +875,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit;
}
} else {
if (NULL != submitBlkRsp.pMeta) {
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
}
}
taosArrayPush(newTbUids, &createTbReq.uid);
......@@ -918,11 +922,7 @@ _exit:
tEncodeSSubmitRsp(&encoder, &submitRsp);
tEncoderClear(&encoder);
for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
}
taosArrayDestroy(submitRsp.pArray);
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
// TODO: the partial success scenario and the error case
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册