diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 313b9da9a5caf8aa72f7d102f81abacde81c458a..29f6cd872e5dfbc3f949833f2d70357dfa775d6a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -30,158 +30,186 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { - int32_t code = 0; +static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; + + int64_t ctime = taosGetTimestampMs(); SDecoder dc = {0}; + int32_t nReqs; - switch (pMsg->msgType) { - case TDMT_VND_CREATE_TABLE: { - int64_t ctime = taosGetTimestampMs(); - int32_t nReqs; + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + if (tStartDecode(&dc) < 0) { + code = TSDB_CODE_INVALID_MSG; + return code; + } + + if (tDecodeI32v(&dc, &nReqs) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int32_t iReq = 0; iReq < nReqs; iReq++) { + tb_uid_t uid = tGenIdPI64(); + char *name = NULL; + if (tStartDecode(&dc) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (tDecodeI32v(&dc, NULL) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (tDecodeCStr(&dc, &name) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + *(int64_t *)(dc.data + dc.pos) = uid; + *(int64_t *)(dc.data + dc.pos + 8) = ctime; + + vTrace("vgId:%d table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid); + tEndDecode(&dc); + } + + tEndDecode(&dc); + +_exit: + tDecoderClear(&dc); + return code; +} + +static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; + + int64_t ctime = taosGetTimestampMs(); + SDecoder dc = {0}; + + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + tStartDecode(&dc); + + uint64_t nSubmitTbData; + if (tDecodeU64v(&dc, &nSubmitTbData) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < nSubmitTbData; i++) { + if (tStartDecode(&dc) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + int32_t flags; + if (tDecodeI32v(&dc, &flags) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + // SVCreateTbReq if (tStartDecode(&dc) < 0) { code = TSDB_CODE_INVALID_MSG; - return code; + TSDB_CHECK_CODE(code, lino, _exit); } - if (tDecodeI32v(&dc, &nReqs) < 0) { + if (tDecodeI32v(&dc, NULL) < 0) { code = TSDB_CODE_INVALID_MSG; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - for (int32_t iReq = 0; iReq < nReqs; iReq++) { - tb_uid_t uid = tGenIdPI64(); - char *name = NULL; - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; + char *name = NULL; + if (tDecodeCStr(&dc, &name) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid); - tEndDecode(&dc); + int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); + if (uid == 0) { + uid = tGenIdPI64(); } - tEndDecode(&dc); - tDecoderClear(&dc); - } break; - case TDMT_VND_SUBMIT: { - int64_t ctime = taosGetTimestampMs(); + *(int64_t *)(dc.data + dc.pos) = uid; + *(int64_t *)(dc.data + dc.pos + 8) = ctime; - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); - tStartDecode(&dc); + tEndDecode(&dc); - uint64_t nSubmitTbData; - if (tDecodeU64v(&dc, &nSubmitTbData) < 0) { + // SSubmitTbData + int64_t suid; + if (tDecodeI64(&dc, &suid) < 0) { code = TSDB_CODE_INVALID_MSG; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - for (int32_t i = 0; i < nSubmitTbData; i++) { - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + *(int64_t *)(dc.data + dc.pos) = uid; + } - int32_t flags; - if (tDecodeI32v(&dc, &flags) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + tEndDecode(&dc); + } - if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { - // SVCreateTbReq - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - char *name = NULL; - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); - if (uid == 0) { - uid = tGenIdPI64(); - } - - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; - - tEndDecode(&dc); - - // SSubmitTbData - int64_t suid; - if (tDecodeI64(&dc, &suid) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - *(int64_t *)(dc.data + dc.pos) = uid; - } + tEndDecode(&dc); + tDecoderClear(&dc); - tEndDecode(&dc); - } +_exit: + return code; +} - tEndDecode(&dc); - tDecoderClear(&dc); - } break; - case TDMT_VND_DELETE: { - int32_t size; - int32_t ret; - uint8_t *pCont; - SEncoder *pCoder = &(SEncoder){0}; - SDeleteRes res = {0}; - SReadHandle handle = { - .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - - code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); - if (code) { - goto _err; - } +static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; - // malloc and encode - tEncodeSize(tEncodeDeleteRes, &res, size, ret); - pCont = rpcMallocCont(size + sizeof(SMsgHead)); + int32_t size; + int32_t ret; + uint8_t *pCont; + SEncoder *pCoder = &(SEncoder){0}; + SDeleteRes res = {0}; + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); - ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); + if (code) goto _exit; - tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); - tEncodeDeleteRes(pCoder, &res); - tEncoderClear(pCoder); + // malloc and encode + tEncodeSize(tEncodeDeleteRes, &res, size, ret); + pCont = rpcMallocCont(size + sizeof(SMsgHead)); + + ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); + ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + + tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); + tEncodeDeleteRes(pCoder, &res); + tEncoderClear(pCoder); + + rpcFreeCont(pMsg->pCont); + pMsg->pCont = pCont; + pMsg->contLen = size + sizeof(SMsgHead); + + taosArrayDestroy(res.uidList); + +_exit: + return code; +} - rpcFreeCont(pMsg->pCont); - pMsg->pCont = pCont; - pMsg->contLen = size + sizeof(SMsgHead); +int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; - taosArrayDestroy(res.uidList); + switch (pMsg->msgType) { + case TDMT_VND_CREATE_TABLE: { + code = vnodePreProcessCreateTableMsg(pVnode, pMsg); + } break; + case TDMT_VND_SUBMIT: { + code = vnodePreProcessSubmitMsg(pVnode, pMsg); + } break; + case TDMT_VND_DELETE: { + code = vnodePreProcessDeleteMsg(pVnode, pMsg); } break; default: break; } - return code; - -_err: - vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code)); +_exit: + if (code) { + vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code), + pMsg->msgType); + } return code; } @@ -871,7 +899,6 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { -#if 1 int32_t code = 0; terrno = 0; @@ -1042,145 +1069,6 @@ _exit: if (code) terrno = code; return code; - -#else - SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - SSubmitRsp submitRsp = {0}; - int32_t nRows = 0; - int32_t tsize, ret; - SEncoder encoder = {0}; - SArray *newTbUids = NULL; - SVStatis statis = {0}; - bool tbCreated = false; - terrno = TSDB_CODE_SUCCESS; - - pRsp->code = 0; - pSubmitReq->version = version; - statis.nBatchInsert = 1; - - if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) { - pRsp->code = terrno; - goto _exit; - } - - submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp)); - newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t)); - if (!submitRsp.pArray || !newTbUids) { - pRsp->code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - for (;;) { - tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; - - SSubmitBlkRsp submitBlkRsp = {0}; - tbCreated = false; - - // create table for auto create table mode - if (msgIter.schemaLen > 0) { - // tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); - // if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { - // pRsp->code = TSDB_CODE_INVALID_MSG; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - // if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - // if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) { - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) { - // if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - // submitBlkRsp.code = terrno; - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - } else { - if (NULL != submitBlkRsp.pMeta) { - vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); - } - - // taosArrayPush(newTbUids, &createTbReq.uid); - - submitBlkRsp.uid = createTbReq.uid; - submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); - sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); - tbCreated = true; - } - - // msgIter.uid = createTbReq.uid; - // if (createTbReq.type == TSDB_CHILD_TABLE) { - // msgIter.suid = createTbReq.ctb.suid; - // } else { - // msgIter.suid = 0; - // } - - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - } - - if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { - submitBlkRsp.code = terrno; - } - - submitRsp.numOfRows += submitBlkRsp.numOfRows; - submitRsp.affectedRows += submitBlkRsp.affectedRows; - if (tbCreated || submitBlkRsp.code) { - taosArrayPush(submitRsp.pArray, &submitBlkRsp); - } - } - - // if (taosArrayGetSize(newTbUids) > 0) { - // vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), - // (int32_t)taosArrayGetSize(newTbUids)); - // } - - // tqUpdateTbUidList(pVnode->pTq, newTbUids, true); - -_exit: - taosArrayDestroy(newTbUids); - // tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); - // pRsp->pCont = rpcMallocCont(tsize); - // pRsp->contLen = tsize; - // tEncoderInit(&encoder, pRsp->pCont, tsize); - // tEncodeSSubmitRsp(&encoder, &submitRsp); - // tEncoderClear(&encoder); - - taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); - - // TODO: the partial success scenario and the error case - // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level - // 1/level 2. - // TODO: refactor - if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { - statis.nBatchInsertSuccess = 1; - tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); - } - - // N.B. not strict as the following procedure is not atomic - atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows); - atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); - atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert); - atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess); - - vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); - return 0; -#endif - return 0; } static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {