diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 300ab13f93dc779ceb96719e0e35ec285ba15a40..07f9b6b01a4c2c05697df1c03116d971867029c5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1558,13 +1558,23 @@ typedef struct { int32_t code; } SVCreateTbRsp, SVUpdateTbRsp; +int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp); +int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp); + int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); typedef struct { - SArray* rspList; // SArray + int32_t nRsps; + union { + SVCreateTbRsp* pRsps; + SArray* pArray; + }; } SVCreateTbBatchRsp; +int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp); +int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp); + int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index bc62ac4c7739f7f5e50693aae9f784aa58993505..4adb5c503165d56efcd363e8aa848c098a8153e2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3418,6 +3418,36 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc return 0; } +int tEncodeSVCreateTbBatchRsp(SCoder *pCoder, const SVCreateTbBatchRsp *pRsp) { + int32_t nRsps = taosArrayGetSize(pRsp->pArray); + SVCreateTbRsp *pCreateRsp; + + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32v(pCoder, nRsps) < 0) return -1; + for (int32_t i = 0; i < nRsps; i++) { + pCreateRsp = taosArrayGet(pRsp->pArray, i); + if (tEncodeSVCreateTbRsp(pCoder, pCreateRsp) < 0) return -1; + } + + tEndEncode(pCoder); + + return 0; +} + +int tDecodeSVCreateTbBatchRsp(SCoder *pCoder, SVCreateTbBatchRsp *pRsp) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI32v(pCoder, &pRsp->nRsps) < 0) return -1; + pRsp->pRsps = (SVCreateTbRsp *)TCODER_MALLOC(pCoder, sizeof(*pRsp->pRsps) * pRsp->nRsps); + for (int32_t i = 0; i < pRsp->nRsps; i++) { + if (tDecodeSVCreateTbRsp(pCoder, pRsp->pRsps + i) < 0) return -1; + } + + tEndDecode(pCoder); + return 0; +} + int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tlen = 0; @@ -3642,6 +3672,24 @@ int tDecodeSVCreateTbBatchReq(SCoder *pCoder, SVCreateTbBatchReq *pReq) { if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; } + tEndDecode(pCoder); + return 0; +} + +int tEncodeSVCreateTbRsp(SCoder *pCoder, const SVCreateTbRsp *pRsp) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32(pCoder, pRsp->code) < 0) return -1; + + tEndEncode(pCoder); + return 0; +} + +int tDecodeSVCreateTbRsp(SCoder *pCoder, SVCreateTbRsp *pRsp) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1; + tEndDecode(pCoder); return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 68f6ed773ebf41c92a5c96852bb9e7bb5046f36c..5119794afffff90a61f9831a1653da3710d3ff5d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -58,7 +58,7 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); +int vnodeValidateTableHash(SVnode *pVnode, char *tableFName); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 4f9631979cf5623016aeab8d99146f8f712c7235..002ee2aa5161d12201ac2fa464eb31bc4370331d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -126,10 +126,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { return 0; } -int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { +int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) { uint32_t hashValue = 0; - switch (pVnodeOptions->hashMethod) { + switch (pVnode->config.hashMethod) { default: hashValue = MurmurHash3_32(tableFName, strlen(tableFName)); break; @@ -143,5 +143,5 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { } #endif - return TSDB_CODE_SUCCESS; + return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0d6bae73a9f2003d1fca487c901c46241a7f9956..83591a4e00e014d22fdb315194a56d5678fa5c8f 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -42,7 +42,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); metaRsp.dbId = pVnode->config.dbId; sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); - code = vnodeValidateTableHash(&pVnode->config, tableFName); + code = vnodeValidateTableHash(pVnode, tableFName); if (code) { goto _exit; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ee8e7161ec8cdd3a261ceeca1d67819023625208..68c96f1be2fe717de1055ea6dcc70d3280f740ab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -249,8 +249,14 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int rcode = 0; SVCreateTbBatchReq req = {0}; SVCreateTbReq *pCreateReq; + SVCreateTbBatchRsp rsp = {0}; + SVCreateTbRsp cRsp = {0}; + char tbName[TSDB_TABLE_FNAME_LEN]; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; // decode tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); @@ -260,17 +266,47 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, goto _exit; } + rsp.pArray = taosArrayInit(sizeof(cRsp), req.nReqs); + if (rsp.pArray == NULL) { + rcode = -1; + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + // loop to create table for (int iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; + + // validate hash + sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); + if (vnodeValidateTableHash(pVnode, tbName) < 0) { + cRsp.code = TSDB_CODE_VND_HASH_MISMATCH; + taosArrayPush(rsp.pArray, &cRsp); + continue; + } + + // do create table if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) { - // TODO: fill request + cRsp.code = terrno; } else { - // TODO + cRsp.code = TSDB_CODE_SUCCESS; } + + taosArrayPush(rsp.pArray, &cRsp); } + tCoderClear(&coder); + // prepare rsp + tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen); + pRsp->pCont = rpcMallocCont(pRsp->contLen); + if (pRsp->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } + tCoderInit(&coder, TD_LITTLE_ENDIAN, pRsp->pCont, pRsp->contLen, TD_ENCODER); + tEncodeSVCreateTbBatchRsp(&coder, &rsp); _exit: tCoderClear(&coder); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c0b3ae7055d6b0d72784c40f7345476dc5126de7..c522e9598fa7dd1eec36704f67d34489f126640f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1076,17 +1076,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SVCreateTbBatchRsp batchRsp = {0}; if (msg) { SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp)); - if (batchRsp.rspList) { - int32_t num = taosArrayGetSize(batchRsp.rspList); + if (batchRsp.pArray) { + int32_t num = taosArrayGetSize(batchRsp.pArray); for (int32_t i = 0; i < num; ++i) { - SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i); + SVCreateTbRsp *rsp = taosArrayGet(batchRsp.pArray, i); if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { - taosArrayDestroy(batchRsp.rspList); + taosArrayDestroy(batchRsp.pArray); SCH_ERR_JRET(rsp->code); } } - taosArrayDestroy(batchRsp.rspList); + taosArrayDestroy(batchRsp.pArray); } } @@ -2734,7 +2734,7 @@ void schedulerFreeTaskList(SArray *taskList) { void schedulerDestroy(void) { if (schMgmt.jobRef) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); - int64_t refId = 0; + int64_t refId = 0; while (pJob) { refId = pJob->refId; @@ -2751,12 +2751,12 @@ void schedulerDestroy(void) { } if (schMgmt.hbConnections) { - void *pIter = taosHashIterate(schMgmt.hbConnections, NULL); + void *pIter = taosHashIterate(schMgmt.hbConnections, NULL); while (pIter != NULL) { SSchHbTrans *hb = pIter; schFreeRpcCtx(&hb->rpcCtx); pIter = taosHashIterate(schMgmt.hbConnections, pIter); - } + } taosHashCleanup(schMgmt.hbConnections); schMgmt.hbConnections = NULL; }