提交 0ebd3281 编写于 作者: D dapan1121

enh: update table meta after creating table

上级 3e2f1d2c
......@@ -441,6 +441,25 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver);
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......@@ -472,6 +491,14 @@ int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct {
STableMetaRsp* pMeta;
} SMCreateStbRsp;
int32_t tEncodeSMCreateStbRsp(SEncoder* pEncoder, const SMCreateStbRsp* pRsp);
int32_t tDecodeSMCreateStbRsp(SDecoder* pDecoder, SMCreateStbRsp* pRsp);
void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
......@@ -1239,23 +1266,6 @@ typedef struct {
SVgroupInfo vgroups[];
} SVgroupsInfo;
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
STableMetaRsp* pMeta;
} SMAlterStbRsp;
......@@ -2028,11 +2038,13 @@ int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
int32_t code;
STableMetaRsp* pMeta;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
void tFreeSVCreateTbRsp(void* param);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
......
......@@ -363,8 +363,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData*
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest);
#ifdef __cplusplus
......
......@@ -780,6 +780,10 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS;
......@@ -802,6 +806,19 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code = handleAlterTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_CREATE_TABLE: {
SArray* pList = (SArray*)pRes->res;
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
void* res = taosArrayGetP(pList, i);
code = handleCreateTbExecRes(res, pCatalog);
}
break;
}
case TDMT_MND_CREATE_STB: {
code = handleCreateTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_SUBMIT: {
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
......@@ -859,17 +876,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
return;
}
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
removeMeta(pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......@@ -930,6 +943,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
qDestroyQuery(pQuery);
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
......@@ -1127,10 +1144,6 @@ SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool valida
inRetry = true;
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
return pRequest;
}
......
......@@ -232,13 +232,36 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
} else {
SMCreateStbRsp createRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMCreateStbRsp(&coder, &createRsp);
tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
pRequest->body.resInfo.execRes.res = createRsp.pMeta;
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
removeMeta(pRequest->pTscObj, pRequest->tableList);
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if (code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
int32_t ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (pRes->res != NULL) {
ret = handleCreateTbExecRes(pRes->res, pCatalog);
}
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
}
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);
......
......@@ -3196,12 +3196,16 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
if (totalCols > 0) {
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
}
} else {
pRsp->pSchemas = NULL;
}
return 0;
......@@ -5090,6 +5094,10 @@ int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->code) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->pMeta ? 1 : 0) < 0) return -1;
if (pRsp->pMeta) {
if (tEncodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
......@@ -5100,10 +5108,25 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) {
if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1;
int32_t meta = 0;
if (tDecodeI32(pCoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1;
} else {
pRsp->pMeta = NULL;
}
tEndDecode(pCoder);
return 0;
}
void tFreeSVCreateTbRsp(void* param) {
SVCreateTbRsp* pRsp = (SVCreateTbRsp*)param;
taosMemoryFree(pRsp->pMeta);
}
// TDMT_VND_DROP_TABLE =================
static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
......@@ -5558,6 +5581,60 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
}
}
int32_t tEncodeSMCreateStbRsp(SEncoder *pEncoder, const SMCreateStbRsp *pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->pMeta->pSchemas ? 1 : 0) < 0) return -1;
if (pRsp->pMeta->pSchemas) {
if (tEncodeSTableMetaRsp(pEncoder, pRsp->pMeta) < 0) return -1;
}
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) {
int32_t meta = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(pDecoder, pRsp->pMeta) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
int32_t tDeserializeSMCreateStbRsp(void *buf, int32_t bufLen, SMCreateStbRsp *pRsp) {
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(&decoder, pRsp->pMeta) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) {
if (NULL == pRsp) {
return;
}
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
}
}
int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
......
......@@ -35,6 +35,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndFreeStb(SStbObj *pStb);
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
......
......@@ -1774,6 +1774,67 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
return 0;
}
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen) {
int32_t ret = -1;
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (NULL == pDb) {
return -1;
}
SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
if (NULL == pObj) {
goto _OVER;
}
SEncoder ec = {0};
uint32_t contLen = 0;
SMCreateStbRsp stbRsp = {0};
SName name = {0};
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == stbRsp.pMeta) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
if (ret) {
tFreeSMCreateStbRsp(&stbRsp);
goto _OVER;
}
tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, ret);
if (ret) {
tFreeSMCreateStbRsp(&stbRsp);
goto _OVER;
}
void *cont = taosMemoryMalloc(contLen);
tEncoderInit(&ec, cont, contLen);
tEncodeSMCreateStbRsp(&ec, &stbRsp);
tEncoderClear(&ec);
tFreeSMCreateStbRsp(&stbRsp);
*pCont = cont;
*pLen = contLen;
ret = 0;
_OVER:
if (pObj) {
mndReleaseStb(pMnode, pObj);
}
if (pDb) {
mndReleaseDb(pMnode, pDb);
}
return ret;
}
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
void *alterOriData, int32_t alterOriDataLen) {
int32_t code = -1;
......
......@@ -17,6 +17,7 @@
#include "mndTrans.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSync.h"
......@@ -900,15 +901,6 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
SRpcMsg rspMsg = {.code = code, .info = *pInfo};
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
......@@ -924,6 +916,21 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
mndReleaseDb(pMnode, pDb);
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
void *pCont = NULL;
int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
}
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
tmsgSendRsp(&rspMsg);
......
......@@ -102,7 +102,7 @@ int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp **pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
......
......@@ -367,7 +367,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
return 0;
}
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
SMetaEntry me = {0};
SMetaReader mr = {0};
......@@ -427,6 +427,21 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
if (pMetaRsp) {
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (*pMetaRsp) {
if (me.type == TSDB_CHILD_TABLE) {
(*pMetaRsp)->tableType = TSDB_CHILD_TABLE;
(*pMetaRsp)->tuid = pReq->uid;
(*pMetaRsp)->suid = pReq->ctb.suid;
strcpy((*pMetaRsp)->tbName, pReq->name);
} else {
metaUpdateMetaRsp(pReq->uid, pReq->name, pReq->ntb.schemaRow, *pMetaRsp);
}
}
}
metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
pReq->type);
return 0;
......
......@@ -370,6 +370,10 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
}
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
if (NULL == pMetaRsp) {
return;
}
strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
pMetaRsp->dbId = pVnode->config.dbId;
pMetaRsp->vgId = TD_VID(pVnode);
......@@ -514,7 +518,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
}
// do create table
if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) {
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS;
} else {
......@@ -524,6 +528,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
}
taosArrayPush(rsp.pArray, &cRsp);
......@@ -552,7 +557,7 @@ _exit:
pCreateReq = req.pReqs + iReq;
taosArrayDestroy(pCreateReq->ctb.tagName);
}
taosArrayDestroy(rsp.pArray);
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
......@@ -864,7 +869,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, NULL) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno;
pRsp->code = terrno;
......
......@@ -219,6 +219,11 @@ void destroyQueryExecRes(SExecResult* pRes) {
}
switch (pRes->msgType) {
case TDMT_VND_CREATE_TABLE: {
taosArrayDestroyEx((SArray*)pRes->res, tFreeSTableMetaRsp);
break;
}
case TDMT_MND_CREATE_STB:
case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: {
tFreeSTableMetaRsp((STableMetaRsp*)pRes->res);
......
......@@ -102,15 +102,26 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (rsp->pMeta) {
taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta);
}
if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
}
if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) {
taosArrayDestroy((SArray*)pJob->execRes.res);
pJob->execRes.res = NULL;
}
}
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册